xref: /unit/src/nxt_http_source.c (revision 1)
10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
100Sigor@sysoev.ru typedef struct {
110Sigor@sysoev.ru     nxt_http_chunk_parse_t  parse;
120Sigor@sysoev.ru     nxt_source_hook_t       next;
130Sigor@sysoev.ru } nxt_http_source_chunk_t;
140Sigor@sysoev.ru 
150Sigor@sysoev.ru 
160Sigor@sysoev.ru static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
170Sigor@sysoev.ru 
18*1Sigor@sysoev.ru static void nxt_http_source_status_filter(nxt_task_t *task, void *obj,
190Sigor@sysoev.ru     void *data);
20*1Sigor@sysoev.ru static void nxt_http_source_header_filter(nxt_task_t *task, void *obj,
210Sigor@sysoev.ru     void *data);
220Sigor@sysoev.ru 
230Sigor@sysoev.ru static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
240Sigor@sysoev.ru static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
250Sigor@sysoev.ru     nxt_name_value_t *nv);
260Sigor@sysoev.ru static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
270Sigor@sysoev.ru     nxt_name_value_t *nv);
280Sigor@sysoev.ru 
29*1Sigor@sysoev.ru static void nxt_http_source_header_ready(nxt_task_t *task,
30*1Sigor@sysoev.ru     nxt_http_source_t *hs, nxt_buf_t *rest);
31*1Sigor@sysoev.ru static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj,
320Sigor@sysoev.ru     void *data);
33*1Sigor@sysoev.ru static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj,
340Sigor@sysoev.ru     void *data);
35*1Sigor@sysoev.ru static void nxt_http_source_body_filter(nxt_task_t *task, void *obj,
360Sigor@sysoev.ru     void *data);
370Sigor@sysoev.ru 
38*1Sigor@sysoev.ru static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
39*1Sigor@sysoev.ru     nxt_buf_t *b);
40*1Sigor@sysoev.ru static void nxt_http_source_error(nxt_task_t *task,
41*1Sigor@sysoev.ru     nxt_stream_source_t *stream);
42*1Sigor@sysoev.ru static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs);
430Sigor@sysoev.ru static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
440Sigor@sysoev.ru 
450Sigor@sysoev.ru 
460Sigor@sysoev.ru void
47*1Sigor@sysoev.ru nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
480Sigor@sysoev.ru     nxt_http_source_request_create_t request_create)
490Sigor@sysoev.ru {
500Sigor@sysoev.ru     nxt_http_source_t    *hs;
510Sigor@sysoev.ru     nxt_stream_source_t  *stream;
520Sigor@sysoev.ru 
530Sigor@sysoev.ru     hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t));
540Sigor@sysoev.ru     if (nxt_slow_path(hs == NULL)) {
550Sigor@sysoev.ru         goto fail;
560Sigor@sysoev.ru     }
570Sigor@sysoev.ru 
580Sigor@sysoev.ru     us->protocol_source = hs;
590Sigor@sysoev.ru 
600Sigor@sysoev.ru     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
610Sigor@sysoev.ru                                          sizeof(nxt_name_value_t));
620Sigor@sysoev.ru     if (nxt_slow_path(hs->header_in.list == NULL)) {
630Sigor@sysoev.ru         goto fail;
640Sigor@sysoev.ru     }
650Sigor@sysoev.ru 
660Sigor@sysoev.ru     hs->header_in.hash = us->header_hash;
670Sigor@sysoev.ru     hs->upstream = us;
680Sigor@sysoev.ru     hs->request_create = request_create;
690Sigor@sysoev.ru 
700Sigor@sysoev.ru     stream = us->stream;
710Sigor@sysoev.ru 
720Sigor@sysoev.ru     if (stream == NULL) {
730Sigor@sysoev.ru         stream = nxt_mem_zalloc(us->buffers.mem_pool,
740Sigor@sysoev.ru                                 sizeof(nxt_stream_source_t));
750Sigor@sysoev.ru         if (nxt_slow_path(stream == NULL)) {
760Sigor@sysoev.ru             goto fail;
770Sigor@sysoev.ru         }
780Sigor@sysoev.ru 
790Sigor@sysoev.ru         us->stream = stream;
800Sigor@sysoev.ru         stream->upstream = us;
810Sigor@sysoev.ru 
820Sigor@sysoev.ru     } else {
830Sigor@sysoev.ru         nxt_memzero(stream, sizeof(nxt_stream_source_t));
840Sigor@sysoev.ru     }
850Sigor@sysoev.ru 
860Sigor@sysoev.ru     /*
870Sigor@sysoev.ru      * Create the HTTP source filter chain:
880Sigor@sysoev.ru      *   stream source | HTTP status line filter
890Sigor@sysoev.ru      */
900Sigor@sysoev.ru     stream->next = &hs->query;
910Sigor@sysoev.ru     stream->error_handler = nxt_http_source_error;
920Sigor@sysoev.ru 
930Sigor@sysoev.ru     hs->query.context = hs;
940Sigor@sysoev.ru     hs->query.filter = nxt_http_source_status_filter;
950Sigor@sysoev.ru 
960Sigor@sysoev.ru     hs->header_in.content_length = -1;
970Sigor@sysoev.ru 
980Sigor@sysoev.ru     stream->out = nxt_http_source_request_create(hs);
990Sigor@sysoev.ru 
1000Sigor@sysoev.ru     if (nxt_fast_path(stream->out != NULL)) {
1010Sigor@sysoev.ru         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
1020Sigor@sysoev.ru 
103*1Sigor@sysoev.ru         nxt_stream_source_connect(task, stream);
1040Sigor@sysoev.ru         return;
1050Sigor@sysoev.ru     }
1060Sigor@sysoev.ru 
1070Sigor@sysoev.ru fail:
1080Sigor@sysoev.ru 
109*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
1100Sigor@sysoev.ru }
1110Sigor@sysoev.ru 
1120Sigor@sysoev.ru 
1130Sigor@sysoev.ru nxt_inline u_char *
1140Sigor@sysoev.ru nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
1150Sigor@sysoev.ru {
1160Sigor@sysoev.ru     u_char  *s;
1170Sigor@sysoev.ru 
1180Sigor@sysoev.ru     if (nxt_fast_path(len >= src->len)) {
1190Sigor@sysoev.ru         len = src->len;
1200Sigor@sysoev.ru         src->len = 0;
1210Sigor@sysoev.ru 
1220Sigor@sysoev.ru     } else {
1230Sigor@sysoev.ru         src->len -= len;
1240Sigor@sysoev.ru     }
1250Sigor@sysoev.ru 
1260Sigor@sysoev.ru     s = src->data;
1270Sigor@sysoev.ru     src->data += len;
1280Sigor@sysoev.ru 
1290Sigor@sysoev.ru     return nxt_cpymem(p, s, len);
1300Sigor@sysoev.ru }
1310Sigor@sysoev.ru 
1320Sigor@sysoev.ru 
1330Sigor@sysoev.ru static nxt_buf_t *
1340Sigor@sysoev.ru nxt_http_source_request_create(nxt_http_source_t *hs)
1350Sigor@sysoev.ru {
1360Sigor@sysoev.ru     nxt_int_t  ret;
1370Sigor@sysoev.ru     nxt_buf_t  *b, *req, **prev;
1380Sigor@sysoev.ru 
1390Sigor@sysoev.ru     nxt_thread_log_debug("http source create request");
1400Sigor@sysoev.ru 
1410Sigor@sysoev.ru     prev = &req;
1420Sigor@sysoev.ru 
1430Sigor@sysoev.ru new_buffer:
1440Sigor@sysoev.ru 
1450Sigor@sysoev.ru     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
1460Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
1470Sigor@sysoev.ru         return NULL;
1480Sigor@sysoev.ru     }
1490Sigor@sysoev.ru 
1500Sigor@sysoev.ru     b = hs->upstream->buffers.current;
1510Sigor@sysoev.ru     hs->upstream->buffers.current = NULL;
1520Sigor@sysoev.ru 
1530Sigor@sysoev.ru     *prev = b;
1540Sigor@sysoev.ru     prev = &b->next;
1550Sigor@sysoev.ru 
1560Sigor@sysoev.ru     for ( ;; ) {
1570Sigor@sysoev.ru         ret = hs->request_create(hs);
1580Sigor@sysoev.ru 
1590Sigor@sysoev.ru         if (nxt_fast_path(ret == NXT_OK)) {
1600Sigor@sysoev.ru             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
1610Sigor@sysoev.ru                                                b->mem.end - b->mem.free);
1620Sigor@sysoev.ru 
1630Sigor@sysoev.ru             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
1640Sigor@sysoev.ru                 continue;
1650Sigor@sysoev.ru             }
1660Sigor@sysoev.ru 
1670Sigor@sysoev.ru             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
1680Sigor@sysoev.ru                                  b->mem.pos);
1690Sigor@sysoev.ru 
1700Sigor@sysoev.ru             goto new_buffer;
1710Sigor@sysoev.ru         }
1720Sigor@sysoev.ru 
1730Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
1740Sigor@sysoev.ru             return NULL;
1750Sigor@sysoev.ru         }
1760Sigor@sysoev.ru 
1770Sigor@sysoev.ru         /* ret == NXT_DONE */
1780Sigor@sysoev.ru         break;
1790Sigor@sysoev.ru     }
1800Sigor@sysoev.ru 
1810Sigor@sysoev.ru     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
1820Sigor@sysoev.ru 
1830Sigor@sysoev.ru     return req;
1840Sigor@sysoev.ru }
1850Sigor@sysoev.ru 
1860Sigor@sysoev.ru 
1870Sigor@sysoev.ru static void
188*1Sigor@sysoev.ru nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data)
1890Sigor@sysoev.ru {
1900Sigor@sysoev.ru     nxt_int_t          ret;
1910Sigor@sysoev.ru     nxt_buf_t          *b;
1920Sigor@sysoev.ru     nxt_http_source_t  *hs;
1930Sigor@sysoev.ru 
1940Sigor@sysoev.ru     hs = obj;
1950Sigor@sysoev.ru     b = data;
1960Sigor@sysoev.ru 
1970Sigor@sysoev.ru     /*
1980Sigor@sysoev.ru      * No cycle over buffer chain is required since at
1990Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
2000Sigor@sysoev.ru      */
2010Sigor@sysoev.ru 
202*1Sigor@sysoev.ru     nxt_debug(task, "http source status filter");
2030Sigor@sysoev.ru 
2040Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
205*1Sigor@sysoev.ru         nxt_http_source_sync_buffer(task, hs, b);
2060Sigor@sysoev.ru         return;
2070Sigor@sysoev.ru     }
2080Sigor@sysoev.ru 
2090Sigor@sysoev.ru     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
2100Sigor@sysoev.ru 
2110Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_OK)) {
2120Sigor@sysoev.ru         /*
2130Sigor@sysoev.ru          * Change the HTTP source filter chain:
2140Sigor@sysoev.ru          *    stream source | HTTP header filter
2150Sigor@sysoev.ru          */
2160Sigor@sysoev.ru         hs->query.filter = nxt_http_source_header_filter;
2170Sigor@sysoev.ru 
218*1Sigor@sysoev.ru         nxt_debug(task, "upstream status: \"%*s\"",
219*1Sigor@sysoev.ru                   hs->u.status_parse.end - b->mem.start, b->mem.start);
2200Sigor@sysoev.ru 
2210Sigor@sysoev.ru         hs->header_in.status = hs->u.status_parse.code;
2220Sigor@sysoev.ru 
223*1Sigor@sysoev.ru         nxt_debug(task, "upstream version:%d status:%uD \"%*s\"",
224*1Sigor@sysoev.ru                   hs->u.status_parse.http_version,
225*1Sigor@sysoev.ru                   hs->u.status_parse.code,
226*1Sigor@sysoev.ru                   hs->u.status_parse.end - hs->u.status_parse.start,
227*1Sigor@sysoev.ru                   hs->u.status_parse.start);
2280Sigor@sysoev.ru 
2290Sigor@sysoev.ru         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
2300Sigor@sysoev.ru         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
2310Sigor@sysoev.ru 
232*1Sigor@sysoev.ru         nxt_http_source_header_filter(task, hs, b);
2330Sigor@sysoev.ru         return;
2340Sigor@sysoev.ru     }
2350Sigor@sysoev.ru 
2360Sigor@sysoev.ru     if (nxt_slow_path(ret == NXT_ERROR)) {
2370Sigor@sysoev.ru         /* HTTP/0.9 response. */
2380Sigor@sysoev.ru         hs->header_in.status = 200;
239*1Sigor@sysoev.ru         nxt_http_source_header_ready(task, hs, b);
2400Sigor@sysoev.ru         return;
2410Sigor@sysoev.ru     }
2420Sigor@sysoev.ru 
2430Sigor@sysoev.ru     /* ret == NXT_AGAIN */
2440Sigor@sysoev.ru 
2450Sigor@sysoev.ru     /*
2460Sigor@sysoev.ru      * b->mem.pos is always equal to b->mem.end because b is a buffer
2470Sigor@sysoev.ru      * which points to a response part read by the stream source.
2480Sigor@sysoev.ru      * However, since the stream source is an immediate source of the
2490Sigor@sysoev.ru      * status filter, b->parent is a buffer the stream source reads in.
2500Sigor@sysoev.ru      */
2510Sigor@sysoev.ru     if (b->parent->mem.pos == b->parent->mem.end) {
2520Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
2530Sigor@sysoev.ru                                 b->mem.pos - b->mem.start, b->mem.start);
2540Sigor@sysoev.ru 
255*1Sigor@sysoev.ru         nxt_http_source_fail(task, hs);
2560Sigor@sysoev.ru     }
2570Sigor@sysoev.ru }
2580Sigor@sysoev.ru 
2590Sigor@sysoev.ru 
2600Sigor@sysoev.ru static void
261*1Sigor@sysoev.ru nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data)
2620Sigor@sysoev.ru {
2630Sigor@sysoev.ru     nxt_int_t          ret;
2640Sigor@sysoev.ru     nxt_buf_t          *b;
2650Sigor@sysoev.ru     nxt_http_source_t  *hs;
2660Sigor@sysoev.ru 
2670Sigor@sysoev.ru     hs = obj;
2680Sigor@sysoev.ru     b = data;
2690Sigor@sysoev.ru 
2700Sigor@sysoev.ru     /*
2710Sigor@sysoev.ru      * No cycle over buffer chain is required since at
2720Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
2730Sigor@sysoev.ru      */
2740Sigor@sysoev.ru 
275*1Sigor@sysoev.ru     nxt_debug(task, "http source header filter");
2760Sigor@sysoev.ru 
2770Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
278*1Sigor@sysoev.ru         nxt_http_source_sync_buffer(task, hs, b);
2790Sigor@sysoev.ru         return;
2800Sigor@sysoev.ru     }
2810Sigor@sysoev.ru 
2820Sigor@sysoev.ru     for ( ;; ) {
2830Sigor@sysoev.ru         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
2840Sigor@sysoev.ru 
2850Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
2860Sigor@sysoev.ru             break;
2870Sigor@sysoev.ru         }
2880Sigor@sysoev.ru 
2890Sigor@sysoev.ru         ret = nxt_http_source_header_line_process(hs);
2900Sigor@sysoev.ru 
2910Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
2920Sigor@sysoev.ru             break;
2930Sigor@sysoev.ru         }
2940Sigor@sysoev.ru     }
2950Sigor@sysoev.ru 
2960Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_DONE)) {
297*1Sigor@sysoev.ru         nxt_debug(task, "http source header done");
298*1Sigor@sysoev.ru         nxt_http_source_header_ready(task, hs, b);
2990Sigor@sysoev.ru         return;
3000Sigor@sysoev.ru     }
3010Sigor@sysoev.ru 
3020Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_AGAIN)) {
3030Sigor@sysoev.ru         return;
3040Sigor@sysoev.ru     }
3050Sigor@sysoev.ru 
3060Sigor@sysoev.ru     if (ret != NXT_ERROR) {
3070Sigor@sysoev.ru         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
308*1Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,
309*1Sigor@sysoev.ru                 "upstream sent invalid header line: \"%*s\\r...\"",
310*1Sigor@sysoev.ru                 hs->u.header.parse.header_end
311*1Sigor@sysoev.ru                     - hs->u.header.parse.header_name_start,
312*1Sigor@sysoev.ru                 hs->u.header.parse.header_name_start);
3130Sigor@sysoev.ru     }
3140Sigor@sysoev.ru 
3150Sigor@sysoev.ru     /* ret == NXT_ERROR */
3160Sigor@sysoev.ru 
317*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
3180Sigor@sysoev.ru }
3190Sigor@sysoev.ru 
3200Sigor@sysoev.ru 
3210Sigor@sysoev.ru static nxt_int_t
3220Sigor@sysoev.ru nxt_http_source_header_line_process(nxt_http_source_t *hs)
3230Sigor@sysoev.ru {
3240Sigor@sysoev.ru     size_t                     name_len;
3250Sigor@sysoev.ru     nxt_name_value_t           *nv;
3260Sigor@sysoev.ru     nxt_lvlhsh_query_t         lhq;
3270Sigor@sysoev.ru     nxt_http_header_parse_t    *hp;
3280Sigor@sysoev.ru     nxt_upstream_name_value_t  *unv;
3290Sigor@sysoev.ru 
3300Sigor@sysoev.ru     hp = &hs->u.header.parse;
3310Sigor@sysoev.ru 
3320Sigor@sysoev.ru     name_len = hp->header_name_end - hp->header_name_start;
3330Sigor@sysoev.ru 
3340Sigor@sysoev.ru     if (name_len > 255) {
3350Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long header field name: "
3360Sigor@sysoev.ru                                 "\"%*s\"", name_len, hp->header_name_start);
3370Sigor@sysoev.ru         return NXT_ERROR;
3380Sigor@sysoev.ru     }
3390Sigor@sysoev.ru 
3400Sigor@sysoev.ru     nv = nxt_list_add(hs->header_in.list);
3410Sigor@sysoev.ru     if (nxt_slow_path(nv == NULL)) {
3420Sigor@sysoev.ru         return NXT_ERROR;
3430Sigor@sysoev.ru     }
3440Sigor@sysoev.ru 
3450Sigor@sysoev.ru     nv->hash = hp->header_hash;
3460Sigor@sysoev.ru     nv->skip = 0;
3470Sigor@sysoev.ru     nv->name_len = name_len;
3480Sigor@sysoev.ru     nv->name_start = hp->header_name_start;
3490Sigor@sysoev.ru     nv->value_len = hp->header_end - hp->header_start;
3500Sigor@sysoev.ru     nv->value_start = hp->header_start;
3510Sigor@sysoev.ru 
3520Sigor@sysoev.ru     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
3530Sigor@sysoev.ru                          nv->name_len, nv->name_start,
3540Sigor@sysoev.ru                          nv->value_len, nv->value_start);
3550Sigor@sysoev.ru 
3560Sigor@sysoev.ru     lhq.key_hash = nv->hash;
3570Sigor@sysoev.ru     lhq.key.len = nv->name_len;
3580Sigor@sysoev.ru     lhq.key.data = nv->name_start;
3590Sigor@sysoev.ru     lhq.proto = &nxt_upstream_header_hash_proto;
3600Sigor@sysoev.ru 
3610Sigor@sysoev.ru     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
3620Sigor@sysoev.ru         unv = lhq.value;
3630Sigor@sysoev.ru 
3640Sigor@sysoev.ru         if (unv->handler(hs->upstream, nv) != NXT_OK) {
3650Sigor@sysoev.ru             return NXT_ERROR;
3660Sigor@sysoev.ru         }
3670Sigor@sysoev.ru     }
3680Sigor@sysoev.ru 
3690Sigor@sysoev.ru     return NXT_OK;
3700Sigor@sysoev.ru }
3710Sigor@sysoev.ru 
3720Sigor@sysoev.ru 
3730Sigor@sysoev.ru static const nxt_upstream_name_value_t  nxt_http_source_headers[]
3740Sigor@sysoev.ru     nxt_aligned(32) =
3750Sigor@sysoev.ru {
3760Sigor@sysoev.ru     { nxt_http_source_content_length,
3770Sigor@sysoev.ru       nxt_upstream_name_value("content-length") },
3780Sigor@sysoev.ru 
3790Sigor@sysoev.ru     { nxt_http_source_transfer_encoding,
3800Sigor@sysoev.ru       nxt_upstream_name_value("transfer-encoding") },
3810Sigor@sysoev.ru };
3820Sigor@sysoev.ru 
3830Sigor@sysoev.ru 
3840Sigor@sysoev.ru nxt_int_t
3850Sigor@sysoev.ru nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
3860Sigor@sysoev.ru {
3870Sigor@sysoev.ru     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
3880Sigor@sysoev.ru                                         nxt_nitems(nxt_http_source_headers));
3890Sigor@sysoev.ru }
3900Sigor@sysoev.ru 
3910Sigor@sysoev.ru 
3920Sigor@sysoev.ru static nxt_int_t
3930Sigor@sysoev.ru nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
3940Sigor@sysoev.ru {
3950Sigor@sysoev.ru     nxt_off_t          length;
3960Sigor@sysoev.ru     nxt_http_source_t  *hs;
3970Sigor@sysoev.ru 
3980Sigor@sysoev.ru     length = nxt_off_t_parse(nv->value_start, nv->value_len);
3990Sigor@sysoev.ru 
4000Sigor@sysoev.ru     if (nxt_fast_path(length > 0)) {
4010Sigor@sysoev.ru         hs = us->protocol_source;
4020Sigor@sysoev.ru         hs->header_in.content_length = length;
4030Sigor@sysoev.ru         return NXT_OK;
4040Sigor@sysoev.ru     }
4050Sigor@sysoev.ru 
4060Sigor@sysoev.ru     return NXT_ERROR;
4070Sigor@sysoev.ru }
4080Sigor@sysoev.ru 
4090Sigor@sysoev.ru 
4100Sigor@sysoev.ru static nxt_int_t
4110Sigor@sysoev.ru nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
4120Sigor@sysoev.ru     nxt_name_value_t *nv)
4130Sigor@sysoev.ru {
4140Sigor@sysoev.ru     u_char             *end;
4150Sigor@sysoev.ru     nxt_http_source_t  *hs;
4160Sigor@sysoev.ru 
4170Sigor@sysoev.ru     end = nv->value_start + nv->value_len;
4180Sigor@sysoev.ru 
4190Sigor@sysoev.ru     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
4200Sigor@sysoev.ru         hs = us->protocol_source;
4210Sigor@sysoev.ru         hs->chunked = 1;
4220Sigor@sysoev.ru     }
4230Sigor@sysoev.ru 
4240Sigor@sysoev.ru     return NXT_OK;
4250Sigor@sysoev.ru }
4260Sigor@sysoev.ru 
4270Sigor@sysoev.ru 
4280Sigor@sysoev.ru static void
429*1Sigor@sysoev.ru nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs,
430*1Sigor@sysoev.ru     nxt_buf_t *rest)
4310Sigor@sysoev.ru {
4320Sigor@sysoev.ru     nxt_buf_t                *b;
4330Sigor@sysoev.ru     nxt_upstream_source_t    *us;
4340Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
4350Sigor@sysoev.ru 
4360Sigor@sysoev.ru     us = hs->upstream;
4370Sigor@sysoev.ru 
4380Sigor@sysoev.ru     /* Free buffers used for request header. */
4390Sigor@sysoev.ru 
4400Sigor@sysoev.ru     for (b = us->stream->out; b != NULL; b = b->next) {
4410Sigor@sysoev.ru         nxt_buf_pool_free(&us->buffers, b);
4420Sigor@sysoev.ru     }
4430Sigor@sysoev.ru 
4440Sigor@sysoev.ru     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
4450Sigor@sysoev.ru 
4460Sigor@sysoev.ru         if (hs->chunked) {
4470Sigor@sysoev.ru             hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool,
4480Sigor@sysoev.ru                                  sizeof(nxt_http_source_chunk_t));
4490Sigor@sysoev.ru             if (nxt_slow_path(hsc == NULL)) {
4500Sigor@sysoev.ru                 goto fail;
4510Sigor@sysoev.ru             }
4520Sigor@sysoev.ru 
4530Sigor@sysoev.ru             /*
4540Sigor@sysoev.ru              * Change the HTTP source filter chain:
4550Sigor@sysoev.ru              *    stream source | chunk filter | HTTP body filter
4560Sigor@sysoev.ru              */
4570Sigor@sysoev.ru             hs->query.context = hsc;
4580Sigor@sysoev.ru             hs->query.filter = nxt_http_source_chunk_filter;
4590Sigor@sysoev.ru 
4600Sigor@sysoev.ru             hsc->next.context = hs;
4610Sigor@sysoev.ru             hsc->next.filter = nxt_http_source_body_filter;
4620Sigor@sysoev.ru 
4630Sigor@sysoev.ru             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
4640Sigor@sysoev.ru 
4650Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
466*1Sigor@sysoev.ru                 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest);
4670Sigor@sysoev.ru 
4680Sigor@sysoev.ru                 if (nxt_slow_path(hs->rest == NULL)) {
4690Sigor@sysoev.ru                     goto fail;
4700Sigor@sysoev.ru                 }
4710Sigor@sysoev.ru             }
4720Sigor@sysoev.ru 
4730Sigor@sysoev.ru         } else {
4740Sigor@sysoev.ru             /*
4750Sigor@sysoev.ru              * Change the HTTP source filter chain:
4760Sigor@sysoev.ru              *    stream source | HTTP body filter
4770Sigor@sysoev.ru              */
4780Sigor@sysoev.ru             hs->query.filter = nxt_http_source_body_filter;
4790Sigor@sysoev.ru 
4800Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
4810Sigor@sysoev.ru                 hs->rest = rest;
4820Sigor@sysoev.ru             }
4830Sigor@sysoev.ru         }
4840Sigor@sysoev.ru 
4850Sigor@sysoev.ru         hs->upstream->state->ready_handler(hs);
4860Sigor@sysoev.ru         return;
4870Sigor@sysoev.ru     }
4880Sigor@sysoev.ru 
4890Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
4900Sigor@sysoev.ru                          "are not enough to read upstream response",
4910Sigor@sysoev.ru                          us->buffers.max, us->buffers.size / 1024);
4920Sigor@sysoev.ru fail:
4930Sigor@sysoev.ru 
494*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
4950Sigor@sysoev.ru }
4960Sigor@sysoev.ru 
4970Sigor@sysoev.ru 
4980Sigor@sysoev.ru static void
499*1Sigor@sysoev.ru nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data)
5000Sigor@sysoev.ru {
5010Sigor@sysoev.ru     nxt_buf_t                *b;
5020Sigor@sysoev.ru     nxt_http_source_t        *hs;
5030Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
5040Sigor@sysoev.ru 
5050Sigor@sysoev.ru     hsc = obj;
5060Sigor@sysoev.ru     b = data;
5070Sigor@sysoev.ru 
508*1Sigor@sysoev.ru     nxt_debug(task, "http source chunk filter");
5090Sigor@sysoev.ru 
510*1Sigor@sysoev.ru     b = nxt_http_chunk_parse(task, &hsc->parse, b);
5110Sigor@sysoev.ru 
5120Sigor@sysoev.ru     hs = hsc->next.context;
5130Sigor@sysoev.ru 
5140Sigor@sysoev.ru     if (hsc->parse.error) {
515*1Sigor@sysoev.ru         nxt_http_source_fail(task, hs);
5160Sigor@sysoev.ru         return;
5170Sigor@sysoev.ru     }
5180Sigor@sysoev.ru 
5190Sigor@sysoev.ru     if (hsc->parse.chunk_error) {
5200Sigor@sysoev.ru         /* Output all parsed before a chunk error and close upstream. */
521*1Sigor@sysoev.ru         nxt_thread_current_work_queue_add(task->thread,
522*1Sigor@sysoev.ru                                           nxt_http_source_chunk_error,
523*1Sigor@sysoev.ru                                           task, hs, NULL);
5240Sigor@sysoev.ru     }
5250Sigor@sysoev.ru 
5260Sigor@sysoev.ru     if (b != NULL) {
527*1Sigor@sysoev.ru         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
528*1Sigor@sysoev.ru                           &hsc->next, b);
5290Sigor@sysoev.ru     }
5300Sigor@sysoev.ru }
5310Sigor@sysoev.ru 
5320Sigor@sysoev.ru 
5330Sigor@sysoev.ru static void
534*1Sigor@sysoev.ru nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data)
5350Sigor@sysoev.ru {
5360Sigor@sysoev.ru     nxt_http_source_t  *hs;
5370Sigor@sysoev.ru 
5380Sigor@sysoev.ru     hs = obj;
5390Sigor@sysoev.ru 
540*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
5410Sigor@sysoev.ru }
5420Sigor@sysoev.ru 
5430Sigor@sysoev.ru 
5440Sigor@sysoev.ru /*
5450Sigor@sysoev.ru  * The HTTP source body filter accumulates first body buffers before the next
5460Sigor@sysoev.ru  * filter will be established and sets completion handler for the last buffer.
5470Sigor@sysoev.ru  */
5480Sigor@sysoev.ru 
5490Sigor@sysoev.ru static void
550*1Sigor@sysoev.ru nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data)
5510Sigor@sysoev.ru {
5520Sigor@sysoev.ru     nxt_buf_t          *b, *in;
5530Sigor@sysoev.ru     nxt_http_source_t  *hs;
5540Sigor@sysoev.ru 
5550Sigor@sysoev.ru     hs = obj;
5560Sigor@sysoev.ru     in = data;
5570Sigor@sysoev.ru 
558*1Sigor@sysoev.ru     nxt_debug(task, "http source body filter");
5590Sigor@sysoev.ru 
5600Sigor@sysoev.ru     for (b = in; b != NULL; b = b->next) {
5610Sigor@sysoev.ru 
5620Sigor@sysoev.ru         if (nxt_buf_is_last(b)) {
5630Sigor@sysoev.ru             b->data = hs->upstream->data;
5640Sigor@sysoev.ru             b->completion_handler = hs->upstream->state->completion_handler;
5650Sigor@sysoev.ru         }
5660Sigor@sysoev.ru     }
5670Sigor@sysoev.ru 
5680Sigor@sysoev.ru     if (hs->next != NULL) {
569*1Sigor@sysoev.ru         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
570*1Sigor@sysoev.ru                           hs->next, in);
5710Sigor@sysoev.ru         return;
5720Sigor@sysoev.ru     }
5730Sigor@sysoev.ru 
5740Sigor@sysoev.ru     nxt_buf_chain_add(&hs->rest, in);
5750Sigor@sysoev.ru }
5760Sigor@sysoev.ru 
5770Sigor@sysoev.ru 
5780Sigor@sysoev.ru static void
579*1Sigor@sysoev.ru nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
5800Sigor@sysoev.ru     nxt_buf_t *b)
5810Sigor@sysoev.ru {
5820Sigor@sysoev.ru     if (nxt_buf_is_last(b)) {
583*1Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,
584*1Sigor@sysoev.ru                 "upstream closed prematurely connection");
5850Sigor@sysoev.ru 
5860Sigor@sysoev.ru     } else {
587*1Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not "
588*1Sigor@sysoev.ru                 "enough to process upstream response header",
589*1Sigor@sysoev.ru                 hs->upstream->buffers.max, hs->upstream->buffers.size);
5900Sigor@sysoev.ru     }
5910Sigor@sysoev.ru 
5920Sigor@sysoev.ru     /* The stream source sends only the last and the nobuf sync buffer. */
5930Sigor@sysoev.ru 
594*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
5950Sigor@sysoev.ru }
5960Sigor@sysoev.ru 
5970Sigor@sysoev.ru 
5980Sigor@sysoev.ru static void
599*1Sigor@sysoev.ru nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
6000Sigor@sysoev.ru {
6010Sigor@sysoev.ru     nxt_http_source_t  *hs;
6020Sigor@sysoev.ru 
6030Sigor@sysoev.ru     nxt_thread_log_debug("http source error");
6040Sigor@sysoev.ru 
6050Sigor@sysoev.ru     hs = stream->next->context;
606*1Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
6070Sigor@sysoev.ru }
6080Sigor@sysoev.ru 
6090Sigor@sysoev.ru 
6100Sigor@sysoev.ru static void
611*1Sigor@sysoev.ru nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs)
6120Sigor@sysoev.ru {
613*1Sigor@sysoev.ru     nxt_debug(task, "http source fail");
6140Sigor@sysoev.ru 
6150Sigor@sysoev.ru     /* TODO: fail, next upstream, or bad gateway */
6160Sigor@sysoev.ru 
617*1Sigor@sysoev.ru     hs->upstream->state->error_handler(task, hs, NULL);
6180Sigor@sysoev.ru }
6190Sigor@sysoev.ru 
6200Sigor@sysoev.ru 
6210Sigor@sysoev.ru static void
6220Sigor@sysoev.ru nxt_http_source_message(const char *msg, size_t len, u_char *p)
6230Sigor@sysoev.ru {
6240Sigor@sysoev.ru     if (len > NXT_MAX_ERROR_STR - 300) {
6250Sigor@sysoev.ru         len = NXT_MAX_ERROR_STR - 300;
6260Sigor@sysoev.ru         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
6270Sigor@sysoev.ru     }
6280Sigor@sysoev.ru 
6290Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
6300Sigor@sysoev.ru }
631