xref: /unit/src/nxt_http_source.c (revision 65)
10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
100Sigor@sysoev.ru typedef struct {
110Sigor@sysoev.ru     nxt_http_chunk_parse_t  parse;
120Sigor@sysoev.ru     nxt_source_hook_t       next;
130Sigor@sysoev.ru } nxt_http_source_chunk_t;
140Sigor@sysoev.ru 
150Sigor@sysoev.ru 
160Sigor@sysoev.ru static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
170Sigor@sysoev.ru 
181Sigor@sysoev.ru static void nxt_http_source_status_filter(nxt_task_t *task, void *obj,
190Sigor@sysoev.ru     void *data);
201Sigor@sysoev.ru static void nxt_http_source_header_filter(nxt_task_t *task, void *obj,
210Sigor@sysoev.ru     void *data);
220Sigor@sysoev.ru 
230Sigor@sysoev.ru static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
240Sigor@sysoev.ru static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
250Sigor@sysoev.ru     nxt_name_value_t *nv);
260Sigor@sysoev.ru static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
270Sigor@sysoev.ru     nxt_name_value_t *nv);
280Sigor@sysoev.ru 
291Sigor@sysoev.ru static void nxt_http_source_header_ready(nxt_task_t *task,
301Sigor@sysoev.ru     nxt_http_source_t *hs, nxt_buf_t *rest);
311Sigor@sysoev.ru static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj,
320Sigor@sysoev.ru     void *data);
331Sigor@sysoev.ru static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj,
340Sigor@sysoev.ru     void *data);
351Sigor@sysoev.ru static void nxt_http_source_body_filter(nxt_task_t *task, void *obj,
360Sigor@sysoev.ru     void *data);
370Sigor@sysoev.ru 
381Sigor@sysoev.ru static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
391Sigor@sysoev.ru     nxt_buf_t *b);
401Sigor@sysoev.ru static void nxt_http_source_error(nxt_task_t *task,
411Sigor@sysoev.ru     nxt_stream_source_t *stream);
421Sigor@sysoev.ru static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs);
430Sigor@sysoev.ru static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
440Sigor@sysoev.ru 
450Sigor@sysoev.ru 
460Sigor@sysoev.ru void
471Sigor@sysoev.ru nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
480Sigor@sysoev.ru     nxt_http_source_request_create_t request_create)
490Sigor@sysoev.ru {
500Sigor@sysoev.ru     nxt_http_source_t    *hs;
510Sigor@sysoev.ru     nxt_stream_source_t  *stream;
520Sigor@sysoev.ru 
53*65Sigor@sysoev.ru     hs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_http_source_t));
540Sigor@sysoev.ru     if (nxt_slow_path(hs == NULL)) {
550Sigor@sysoev.ru         goto fail;
560Sigor@sysoev.ru     }
570Sigor@sysoev.ru 
580Sigor@sysoev.ru     us->protocol_source = hs;
590Sigor@sysoev.ru 
600Sigor@sysoev.ru     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
610Sigor@sysoev.ru                                          sizeof(nxt_name_value_t));
620Sigor@sysoev.ru     if (nxt_slow_path(hs->header_in.list == NULL)) {
630Sigor@sysoev.ru         goto fail;
640Sigor@sysoev.ru     }
650Sigor@sysoev.ru 
660Sigor@sysoev.ru     hs->header_in.hash = us->header_hash;
670Sigor@sysoev.ru     hs->upstream = us;
680Sigor@sysoev.ru     hs->request_create = request_create;
690Sigor@sysoev.ru 
700Sigor@sysoev.ru     stream = us->stream;
710Sigor@sysoev.ru 
720Sigor@sysoev.ru     if (stream == NULL) {
73*65Sigor@sysoev.ru         stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t));
740Sigor@sysoev.ru         if (nxt_slow_path(stream == NULL)) {
750Sigor@sysoev.ru             goto fail;
760Sigor@sysoev.ru         }
770Sigor@sysoev.ru 
780Sigor@sysoev.ru         us->stream = stream;
790Sigor@sysoev.ru         stream->upstream = us;
800Sigor@sysoev.ru 
810Sigor@sysoev.ru     } else {
820Sigor@sysoev.ru         nxt_memzero(stream, sizeof(nxt_stream_source_t));
830Sigor@sysoev.ru     }
840Sigor@sysoev.ru 
850Sigor@sysoev.ru     /*
860Sigor@sysoev.ru      * Create the HTTP source filter chain:
870Sigor@sysoev.ru      *   stream source | HTTP status line filter
880Sigor@sysoev.ru      */
890Sigor@sysoev.ru     stream->next = &hs->query;
900Sigor@sysoev.ru     stream->error_handler = nxt_http_source_error;
910Sigor@sysoev.ru 
920Sigor@sysoev.ru     hs->query.context = hs;
930Sigor@sysoev.ru     hs->query.filter = nxt_http_source_status_filter;
940Sigor@sysoev.ru 
950Sigor@sysoev.ru     hs->header_in.content_length = -1;
960Sigor@sysoev.ru 
970Sigor@sysoev.ru     stream->out = nxt_http_source_request_create(hs);
980Sigor@sysoev.ru 
990Sigor@sysoev.ru     if (nxt_fast_path(stream->out != NULL)) {
1000Sigor@sysoev.ru         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
1010Sigor@sysoev.ru 
1021Sigor@sysoev.ru         nxt_stream_source_connect(task, stream);
1030Sigor@sysoev.ru         return;
1040Sigor@sysoev.ru     }
1050Sigor@sysoev.ru 
1060Sigor@sysoev.ru fail:
1070Sigor@sysoev.ru 
1081Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
1090Sigor@sysoev.ru }
1100Sigor@sysoev.ru 
1110Sigor@sysoev.ru 
1120Sigor@sysoev.ru nxt_inline u_char *
1130Sigor@sysoev.ru nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
1140Sigor@sysoev.ru {
1150Sigor@sysoev.ru     u_char  *s;
1160Sigor@sysoev.ru 
1170Sigor@sysoev.ru     if (nxt_fast_path(len >= src->len)) {
1180Sigor@sysoev.ru         len = src->len;
1190Sigor@sysoev.ru         src->len = 0;
1200Sigor@sysoev.ru 
1210Sigor@sysoev.ru     } else {
1220Sigor@sysoev.ru         src->len -= len;
1230Sigor@sysoev.ru     }
1240Sigor@sysoev.ru 
1250Sigor@sysoev.ru     s = src->data;
1260Sigor@sysoev.ru     src->data += len;
1270Sigor@sysoev.ru 
1280Sigor@sysoev.ru     return nxt_cpymem(p, s, len);
1290Sigor@sysoev.ru }
1300Sigor@sysoev.ru 
1310Sigor@sysoev.ru 
1320Sigor@sysoev.ru static nxt_buf_t *
1330Sigor@sysoev.ru nxt_http_source_request_create(nxt_http_source_t *hs)
1340Sigor@sysoev.ru {
1350Sigor@sysoev.ru     nxt_int_t  ret;
1360Sigor@sysoev.ru     nxt_buf_t  *b, *req, **prev;
1370Sigor@sysoev.ru 
1380Sigor@sysoev.ru     nxt_thread_log_debug("http source create request");
1390Sigor@sysoev.ru 
1400Sigor@sysoev.ru     prev = &req;
1410Sigor@sysoev.ru 
1420Sigor@sysoev.ru new_buffer:
1430Sigor@sysoev.ru 
1440Sigor@sysoev.ru     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
1450Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
1460Sigor@sysoev.ru         return NULL;
1470Sigor@sysoev.ru     }
1480Sigor@sysoev.ru 
1490Sigor@sysoev.ru     b = hs->upstream->buffers.current;
1500Sigor@sysoev.ru     hs->upstream->buffers.current = NULL;
1510Sigor@sysoev.ru 
1520Sigor@sysoev.ru     *prev = b;
1530Sigor@sysoev.ru     prev = &b->next;
1540Sigor@sysoev.ru 
1550Sigor@sysoev.ru     for ( ;; ) {
1560Sigor@sysoev.ru         ret = hs->request_create(hs);
1570Sigor@sysoev.ru 
1580Sigor@sysoev.ru         if (nxt_fast_path(ret == NXT_OK)) {
1590Sigor@sysoev.ru             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
1600Sigor@sysoev.ru                                                b->mem.end - b->mem.free);
1610Sigor@sysoev.ru 
1620Sigor@sysoev.ru             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
1630Sigor@sysoev.ru                 continue;
1640Sigor@sysoev.ru             }
1650Sigor@sysoev.ru 
1660Sigor@sysoev.ru             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
1670Sigor@sysoev.ru                                  b->mem.pos);
1680Sigor@sysoev.ru 
1690Sigor@sysoev.ru             goto new_buffer;
1700Sigor@sysoev.ru         }
1710Sigor@sysoev.ru 
1720Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
1730Sigor@sysoev.ru             return NULL;
1740Sigor@sysoev.ru         }
1750Sigor@sysoev.ru 
1760Sigor@sysoev.ru         /* ret == NXT_DONE */
1770Sigor@sysoev.ru         break;
1780Sigor@sysoev.ru     }
1790Sigor@sysoev.ru 
1800Sigor@sysoev.ru     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
1810Sigor@sysoev.ru 
1820Sigor@sysoev.ru     return req;
1830Sigor@sysoev.ru }
1840Sigor@sysoev.ru 
1850Sigor@sysoev.ru 
1860Sigor@sysoev.ru static void
1871Sigor@sysoev.ru nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data)
1880Sigor@sysoev.ru {
1890Sigor@sysoev.ru     nxt_int_t          ret;
1900Sigor@sysoev.ru     nxt_buf_t          *b;
1910Sigor@sysoev.ru     nxt_http_source_t  *hs;
1920Sigor@sysoev.ru 
1930Sigor@sysoev.ru     hs = obj;
1940Sigor@sysoev.ru     b = data;
1950Sigor@sysoev.ru 
1960Sigor@sysoev.ru     /*
1970Sigor@sysoev.ru      * No cycle over buffer chain is required since at
1980Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
1990Sigor@sysoev.ru      */
2000Sigor@sysoev.ru 
2011Sigor@sysoev.ru     nxt_debug(task, "http source status filter");
2020Sigor@sysoev.ru 
2030Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
2041Sigor@sysoev.ru         nxt_http_source_sync_buffer(task, hs, b);
2050Sigor@sysoev.ru         return;
2060Sigor@sysoev.ru     }
2070Sigor@sysoev.ru 
2080Sigor@sysoev.ru     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
2090Sigor@sysoev.ru 
2100Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_OK)) {
2110Sigor@sysoev.ru         /*
2120Sigor@sysoev.ru          * Change the HTTP source filter chain:
2130Sigor@sysoev.ru          *    stream source | HTTP header filter
2140Sigor@sysoev.ru          */
2150Sigor@sysoev.ru         hs->query.filter = nxt_http_source_header_filter;
2160Sigor@sysoev.ru 
2171Sigor@sysoev.ru         nxt_debug(task, "upstream status: \"%*s\"",
2181Sigor@sysoev.ru                   hs->u.status_parse.end - b->mem.start, b->mem.start);
2190Sigor@sysoev.ru 
2200Sigor@sysoev.ru         hs->header_in.status = hs->u.status_parse.code;
2210Sigor@sysoev.ru 
2221Sigor@sysoev.ru         nxt_debug(task, "upstream version:%d status:%uD \"%*s\"",
2231Sigor@sysoev.ru                   hs->u.status_parse.http_version,
2241Sigor@sysoev.ru                   hs->u.status_parse.code,
2251Sigor@sysoev.ru                   hs->u.status_parse.end - hs->u.status_parse.start,
2261Sigor@sysoev.ru                   hs->u.status_parse.start);
2270Sigor@sysoev.ru 
2280Sigor@sysoev.ru         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
2290Sigor@sysoev.ru         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
2300Sigor@sysoev.ru 
2311Sigor@sysoev.ru         nxt_http_source_header_filter(task, hs, b);
2320Sigor@sysoev.ru         return;
2330Sigor@sysoev.ru     }
2340Sigor@sysoev.ru 
2350Sigor@sysoev.ru     if (nxt_slow_path(ret == NXT_ERROR)) {
2360Sigor@sysoev.ru         /* HTTP/0.9 response. */
2370Sigor@sysoev.ru         hs->header_in.status = 200;
2381Sigor@sysoev.ru         nxt_http_source_header_ready(task, hs, b);
2390Sigor@sysoev.ru         return;
2400Sigor@sysoev.ru     }
2410Sigor@sysoev.ru 
2420Sigor@sysoev.ru     /* ret == NXT_AGAIN */
2430Sigor@sysoev.ru 
2440Sigor@sysoev.ru     /*
2450Sigor@sysoev.ru      * b->mem.pos is always equal to b->mem.end because b is a buffer
2460Sigor@sysoev.ru      * which points to a response part read by the stream source.
2470Sigor@sysoev.ru      * However, since the stream source is an immediate source of the
2480Sigor@sysoev.ru      * status filter, b->parent is a buffer the stream source reads in.
2490Sigor@sysoev.ru      */
2500Sigor@sysoev.ru     if (b->parent->mem.pos == b->parent->mem.end) {
2510Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
2520Sigor@sysoev.ru                                 b->mem.pos - b->mem.start, b->mem.start);
2530Sigor@sysoev.ru 
2541Sigor@sysoev.ru         nxt_http_source_fail(task, hs);
2550Sigor@sysoev.ru     }
2560Sigor@sysoev.ru }
2570Sigor@sysoev.ru 
2580Sigor@sysoev.ru 
2590Sigor@sysoev.ru static void
2601Sigor@sysoev.ru nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data)
2610Sigor@sysoev.ru {
2620Sigor@sysoev.ru     nxt_int_t          ret;
2630Sigor@sysoev.ru     nxt_buf_t          *b;
2640Sigor@sysoev.ru     nxt_http_source_t  *hs;
2650Sigor@sysoev.ru 
2660Sigor@sysoev.ru     hs = obj;
2670Sigor@sysoev.ru     b = data;
2680Sigor@sysoev.ru 
2690Sigor@sysoev.ru     /*
2700Sigor@sysoev.ru      * No cycle over buffer chain is required since at
2710Sigor@sysoev.ru      * start the stream source passes buffers one at a time.
2720Sigor@sysoev.ru      */
2730Sigor@sysoev.ru 
2741Sigor@sysoev.ru     nxt_debug(task, "http source header filter");
2750Sigor@sysoev.ru 
2760Sigor@sysoev.ru     if (nxt_slow_path(nxt_buf_is_sync(b))) {
2771Sigor@sysoev.ru         nxt_http_source_sync_buffer(task, hs, b);
2780Sigor@sysoev.ru         return;
2790Sigor@sysoev.ru     }
2800Sigor@sysoev.ru 
2810Sigor@sysoev.ru     for ( ;; ) {
2820Sigor@sysoev.ru         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
2830Sigor@sysoev.ru 
2840Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
2850Sigor@sysoev.ru             break;
2860Sigor@sysoev.ru         }
2870Sigor@sysoev.ru 
2880Sigor@sysoev.ru         ret = nxt_http_source_header_line_process(hs);
2890Sigor@sysoev.ru 
2900Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
2910Sigor@sysoev.ru             break;
2920Sigor@sysoev.ru         }
2930Sigor@sysoev.ru     }
2940Sigor@sysoev.ru 
2950Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_DONE)) {
2961Sigor@sysoev.ru         nxt_debug(task, "http source header done");
2971Sigor@sysoev.ru         nxt_http_source_header_ready(task, hs, b);
2980Sigor@sysoev.ru         return;
2990Sigor@sysoev.ru     }
3000Sigor@sysoev.ru 
3010Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_AGAIN)) {
3020Sigor@sysoev.ru         return;
3030Sigor@sysoev.ru     }
3040Sigor@sysoev.ru 
3050Sigor@sysoev.ru     if (ret != NXT_ERROR) {
3060Sigor@sysoev.ru         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
3071Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,
3081Sigor@sysoev.ru                 "upstream sent invalid header line: \"%*s\\r...\"",
3091Sigor@sysoev.ru                 hs->u.header.parse.header_end
3101Sigor@sysoev.ru                     - hs->u.header.parse.header_name_start,
3111Sigor@sysoev.ru                 hs->u.header.parse.header_name_start);
3120Sigor@sysoev.ru     }
3130Sigor@sysoev.ru 
3140Sigor@sysoev.ru     /* ret == NXT_ERROR */
3150Sigor@sysoev.ru 
3161Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
3170Sigor@sysoev.ru }
3180Sigor@sysoev.ru 
3190Sigor@sysoev.ru 
3200Sigor@sysoev.ru static nxt_int_t
3210Sigor@sysoev.ru nxt_http_source_header_line_process(nxt_http_source_t *hs)
3220Sigor@sysoev.ru {
3230Sigor@sysoev.ru     size_t                     name_len;
3240Sigor@sysoev.ru     nxt_name_value_t           *nv;
3250Sigor@sysoev.ru     nxt_lvlhsh_query_t         lhq;
3260Sigor@sysoev.ru     nxt_http_header_parse_t    *hp;
3270Sigor@sysoev.ru     nxt_upstream_name_value_t  *unv;
3280Sigor@sysoev.ru 
3290Sigor@sysoev.ru     hp = &hs->u.header.parse;
3300Sigor@sysoev.ru 
3310Sigor@sysoev.ru     name_len = hp->header_name_end - hp->header_name_start;
3320Sigor@sysoev.ru 
3330Sigor@sysoev.ru     if (name_len > 255) {
3340Sigor@sysoev.ru         nxt_http_source_message("upstream sent too long header field name: "
3350Sigor@sysoev.ru                                 "\"%*s\"", name_len, hp->header_name_start);
3360Sigor@sysoev.ru         return NXT_ERROR;
3370Sigor@sysoev.ru     }
3380Sigor@sysoev.ru 
3390Sigor@sysoev.ru     nv = nxt_list_add(hs->header_in.list);
3400Sigor@sysoev.ru     if (nxt_slow_path(nv == NULL)) {
3410Sigor@sysoev.ru         return NXT_ERROR;
3420Sigor@sysoev.ru     }
3430Sigor@sysoev.ru 
3440Sigor@sysoev.ru     nv->hash = hp->header_hash;
3450Sigor@sysoev.ru     nv->skip = 0;
3460Sigor@sysoev.ru     nv->name_len = name_len;
3470Sigor@sysoev.ru     nv->name_start = hp->header_name_start;
3480Sigor@sysoev.ru     nv->value_len = hp->header_end - hp->header_start;
3490Sigor@sysoev.ru     nv->value_start = hp->header_start;
3500Sigor@sysoev.ru 
3510Sigor@sysoev.ru     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
3520Sigor@sysoev.ru                          nv->name_len, nv->name_start,
3530Sigor@sysoev.ru                          nv->value_len, nv->value_start);
3540Sigor@sysoev.ru 
3550Sigor@sysoev.ru     lhq.key_hash = nv->hash;
3560Sigor@sysoev.ru     lhq.key.len = nv->name_len;
3570Sigor@sysoev.ru     lhq.key.data = nv->name_start;
3580Sigor@sysoev.ru     lhq.proto = &nxt_upstream_header_hash_proto;
3590Sigor@sysoev.ru 
3600Sigor@sysoev.ru     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
3610Sigor@sysoev.ru         unv = lhq.value;
3620Sigor@sysoev.ru 
3630Sigor@sysoev.ru         if (unv->handler(hs->upstream, nv) != NXT_OK) {
3640Sigor@sysoev.ru             return NXT_ERROR;
3650Sigor@sysoev.ru         }
3660Sigor@sysoev.ru     }
3670Sigor@sysoev.ru 
3680Sigor@sysoev.ru     return NXT_OK;
3690Sigor@sysoev.ru }
3700Sigor@sysoev.ru 
3710Sigor@sysoev.ru 
3720Sigor@sysoev.ru static const nxt_upstream_name_value_t  nxt_http_source_headers[]
3730Sigor@sysoev.ru     nxt_aligned(32) =
3740Sigor@sysoev.ru {
3750Sigor@sysoev.ru     { nxt_http_source_content_length,
3760Sigor@sysoev.ru       nxt_upstream_name_value("content-length") },
3770Sigor@sysoev.ru 
3780Sigor@sysoev.ru     { nxt_http_source_transfer_encoding,
3790Sigor@sysoev.ru       nxt_upstream_name_value("transfer-encoding") },
3800Sigor@sysoev.ru };
3810Sigor@sysoev.ru 
3820Sigor@sysoev.ru 
3830Sigor@sysoev.ru nxt_int_t
384*65Sigor@sysoev.ru nxt_http_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh)
3850Sigor@sysoev.ru {
3860Sigor@sysoev.ru     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
3870Sigor@sysoev.ru                                         nxt_nitems(nxt_http_source_headers));
3880Sigor@sysoev.ru }
3890Sigor@sysoev.ru 
3900Sigor@sysoev.ru 
3910Sigor@sysoev.ru static nxt_int_t
3920Sigor@sysoev.ru nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
3930Sigor@sysoev.ru {
3940Sigor@sysoev.ru     nxt_off_t          length;
3950Sigor@sysoev.ru     nxt_http_source_t  *hs;
3960Sigor@sysoev.ru 
3970Sigor@sysoev.ru     length = nxt_off_t_parse(nv->value_start, nv->value_len);
3980Sigor@sysoev.ru 
3990Sigor@sysoev.ru     if (nxt_fast_path(length > 0)) {
4000Sigor@sysoev.ru         hs = us->protocol_source;
4010Sigor@sysoev.ru         hs->header_in.content_length = length;
4020Sigor@sysoev.ru         return NXT_OK;
4030Sigor@sysoev.ru     }
4040Sigor@sysoev.ru 
4050Sigor@sysoev.ru     return NXT_ERROR;
4060Sigor@sysoev.ru }
4070Sigor@sysoev.ru 
4080Sigor@sysoev.ru 
4090Sigor@sysoev.ru static nxt_int_t
4100Sigor@sysoev.ru nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
4110Sigor@sysoev.ru     nxt_name_value_t *nv)
4120Sigor@sysoev.ru {
4130Sigor@sysoev.ru     u_char             *end;
4140Sigor@sysoev.ru     nxt_http_source_t  *hs;
4150Sigor@sysoev.ru 
4160Sigor@sysoev.ru     end = nv->value_start + nv->value_len;
4170Sigor@sysoev.ru 
4180Sigor@sysoev.ru     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
4190Sigor@sysoev.ru         hs = us->protocol_source;
4200Sigor@sysoev.ru         hs->chunked = 1;
4210Sigor@sysoev.ru     }
4220Sigor@sysoev.ru 
4230Sigor@sysoev.ru     return NXT_OK;
4240Sigor@sysoev.ru }
4250Sigor@sysoev.ru 
4260Sigor@sysoev.ru 
4270Sigor@sysoev.ru static void
4281Sigor@sysoev.ru nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs,
4291Sigor@sysoev.ru     nxt_buf_t *rest)
4300Sigor@sysoev.ru {
4310Sigor@sysoev.ru     nxt_buf_t                *b;
4320Sigor@sysoev.ru     nxt_upstream_source_t    *us;
4330Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
4340Sigor@sysoev.ru 
4350Sigor@sysoev.ru     us = hs->upstream;
4360Sigor@sysoev.ru 
4370Sigor@sysoev.ru     /* Free buffers used for request header. */
4380Sigor@sysoev.ru 
4390Sigor@sysoev.ru     for (b = us->stream->out; b != NULL; b = b->next) {
4400Sigor@sysoev.ru         nxt_buf_pool_free(&us->buffers, b);
4410Sigor@sysoev.ru     }
4420Sigor@sysoev.ru 
4430Sigor@sysoev.ru     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
4440Sigor@sysoev.ru 
4450Sigor@sysoev.ru         if (hs->chunked) {
446*65Sigor@sysoev.ru             hsc = nxt_mp_zalloc(hs->upstream->buffers.mem_pool,
447*65Sigor@sysoev.ru                                 sizeof(nxt_http_source_chunk_t));
4480Sigor@sysoev.ru             if (nxt_slow_path(hsc == NULL)) {
4490Sigor@sysoev.ru                 goto fail;
4500Sigor@sysoev.ru             }
4510Sigor@sysoev.ru 
4520Sigor@sysoev.ru             /*
4530Sigor@sysoev.ru              * Change the HTTP source filter chain:
4540Sigor@sysoev.ru              *    stream source | chunk filter | HTTP body filter
4550Sigor@sysoev.ru              */
4560Sigor@sysoev.ru             hs->query.context = hsc;
4570Sigor@sysoev.ru             hs->query.filter = nxt_http_source_chunk_filter;
4580Sigor@sysoev.ru 
4590Sigor@sysoev.ru             hsc->next.context = hs;
4600Sigor@sysoev.ru             hsc->next.filter = nxt_http_source_body_filter;
4610Sigor@sysoev.ru 
4620Sigor@sysoev.ru             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
4630Sigor@sysoev.ru 
4640Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
4651Sigor@sysoev.ru                 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest);
4660Sigor@sysoev.ru 
4670Sigor@sysoev.ru                 if (nxt_slow_path(hs->rest == NULL)) {
4680Sigor@sysoev.ru                     goto fail;
4690Sigor@sysoev.ru                 }
4700Sigor@sysoev.ru             }
4710Sigor@sysoev.ru 
4720Sigor@sysoev.ru         } else {
4730Sigor@sysoev.ru             /*
4740Sigor@sysoev.ru              * Change the HTTP source filter chain:
4750Sigor@sysoev.ru              *    stream source | HTTP body filter
4760Sigor@sysoev.ru              */
4770Sigor@sysoev.ru             hs->query.filter = nxt_http_source_body_filter;
4780Sigor@sysoev.ru 
4790Sigor@sysoev.ru             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
4800Sigor@sysoev.ru                 hs->rest = rest;
4810Sigor@sysoev.ru             }
4820Sigor@sysoev.ru         }
4830Sigor@sysoev.ru 
4840Sigor@sysoev.ru         hs->upstream->state->ready_handler(hs);
4850Sigor@sysoev.ru         return;
4860Sigor@sysoev.ru     }
4870Sigor@sysoev.ru 
4880Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
4890Sigor@sysoev.ru                          "are not enough to read upstream response",
4900Sigor@sysoev.ru                          us->buffers.max, us->buffers.size / 1024);
4910Sigor@sysoev.ru fail:
4920Sigor@sysoev.ru 
4931Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
4940Sigor@sysoev.ru }
4950Sigor@sysoev.ru 
4960Sigor@sysoev.ru 
4970Sigor@sysoev.ru static void
4981Sigor@sysoev.ru nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data)
4990Sigor@sysoev.ru {
5000Sigor@sysoev.ru     nxt_buf_t                *b;
5010Sigor@sysoev.ru     nxt_http_source_t        *hs;
5020Sigor@sysoev.ru     nxt_http_source_chunk_t  *hsc;
5030Sigor@sysoev.ru 
5040Sigor@sysoev.ru     hsc = obj;
5050Sigor@sysoev.ru     b = data;
5060Sigor@sysoev.ru 
5071Sigor@sysoev.ru     nxt_debug(task, "http source chunk filter");
5080Sigor@sysoev.ru 
5091Sigor@sysoev.ru     b = nxt_http_chunk_parse(task, &hsc->parse, b);
5100Sigor@sysoev.ru 
5110Sigor@sysoev.ru     hs = hsc->next.context;
5120Sigor@sysoev.ru 
5130Sigor@sysoev.ru     if (hsc->parse.error) {
5141Sigor@sysoev.ru         nxt_http_source_fail(task, hs);
5150Sigor@sysoev.ru         return;
5160Sigor@sysoev.ru     }
5170Sigor@sysoev.ru 
5180Sigor@sysoev.ru     if (hsc->parse.chunk_error) {
5190Sigor@sysoev.ru         /* Output all parsed before a chunk error and close upstream. */
5201Sigor@sysoev.ru         nxt_thread_current_work_queue_add(task->thread,
5211Sigor@sysoev.ru                                           nxt_http_source_chunk_error,
5221Sigor@sysoev.ru                                           task, hs, NULL);
5230Sigor@sysoev.ru     }
5240Sigor@sysoev.ru 
5250Sigor@sysoev.ru     if (b != NULL) {
5261Sigor@sysoev.ru         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
5271Sigor@sysoev.ru                           &hsc->next, b);
5280Sigor@sysoev.ru     }
5290Sigor@sysoev.ru }
5300Sigor@sysoev.ru 
5310Sigor@sysoev.ru 
5320Sigor@sysoev.ru static void
5331Sigor@sysoev.ru nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data)
5340Sigor@sysoev.ru {
5350Sigor@sysoev.ru     nxt_http_source_t  *hs;
5360Sigor@sysoev.ru 
5370Sigor@sysoev.ru     hs = obj;
5380Sigor@sysoev.ru 
5391Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
5400Sigor@sysoev.ru }
5410Sigor@sysoev.ru 
5420Sigor@sysoev.ru 
5430Sigor@sysoev.ru /*
5440Sigor@sysoev.ru  * The HTTP source body filter accumulates first body buffers before the next
5450Sigor@sysoev.ru  * filter will be established and sets completion handler for the last buffer.
5460Sigor@sysoev.ru  */
5470Sigor@sysoev.ru 
5480Sigor@sysoev.ru static void
5491Sigor@sysoev.ru nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data)
5500Sigor@sysoev.ru {
5510Sigor@sysoev.ru     nxt_buf_t          *b, *in;
5520Sigor@sysoev.ru     nxt_http_source_t  *hs;
5530Sigor@sysoev.ru 
5540Sigor@sysoev.ru     hs = obj;
5550Sigor@sysoev.ru     in = data;
5560Sigor@sysoev.ru 
5571Sigor@sysoev.ru     nxt_debug(task, "http source body filter");
5580Sigor@sysoev.ru 
5590Sigor@sysoev.ru     for (b = in; b != NULL; b = b->next) {
5600Sigor@sysoev.ru 
5610Sigor@sysoev.ru         if (nxt_buf_is_last(b)) {
5620Sigor@sysoev.ru             b->data = hs->upstream->data;
5630Sigor@sysoev.ru             b->completion_handler = hs->upstream->state->completion_handler;
5640Sigor@sysoev.ru         }
5650Sigor@sysoev.ru     }
5660Sigor@sysoev.ru 
5670Sigor@sysoev.ru     if (hs->next != NULL) {
5681Sigor@sysoev.ru         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
5691Sigor@sysoev.ru                           hs->next, in);
5700Sigor@sysoev.ru         return;
5710Sigor@sysoev.ru     }
5720Sigor@sysoev.ru 
5730Sigor@sysoev.ru     nxt_buf_chain_add(&hs->rest, in);
5740Sigor@sysoev.ru }
5750Sigor@sysoev.ru 
5760Sigor@sysoev.ru 
5770Sigor@sysoev.ru static void
5781Sigor@sysoev.ru nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
5790Sigor@sysoev.ru     nxt_buf_t *b)
5800Sigor@sysoev.ru {
5810Sigor@sysoev.ru     if (nxt_buf_is_last(b)) {
5821Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,
5831Sigor@sysoev.ru                 "upstream closed prematurely connection");
5840Sigor@sysoev.ru 
5850Sigor@sysoev.ru     } else {
5861Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not "
5871Sigor@sysoev.ru                 "enough to process upstream response header",
5881Sigor@sysoev.ru                 hs->upstream->buffers.max, hs->upstream->buffers.size);
5890Sigor@sysoev.ru     }
5900Sigor@sysoev.ru 
5910Sigor@sysoev.ru     /* The stream source sends only the last and the nobuf sync buffer. */
5920Sigor@sysoev.ru 
5931Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
5940Sigor@sysoev.ru }
5950Sigor@sysoev.ru 
5960Sigor@sysoev.ru 
5970Sigor@sysoev.ru static void
5981Sigor@sysoev.ru nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
5990Sigor@sysoev.ru {
6000Sigor@sysoev.ru     nxt_http_source_t  *hs;
6010Sigor@sysoev.ru 
6020Sigor@sysoev.ru     nxt_thread_log_debug("http source error");
6030Sigor@sysoev.ru 
6040Sigor@sysoev.ru     hs = stream->next->context;
6051Sigor@sysoev.ru     nxt_http_source_fail(task, hs);
6060Sigor@sysoev.ru }
6070Sigor@sysoev.ru 
6080Sigor@sysoev.ru 
6090Sigor@sysoev.ru static void
6101Sigor@sysoev.ru nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs)
6110Sigor@sysoev.ru {
6121Sigor@sysoev.ru     nxt_debug(task, "http source fail");
6130Sigor@sysoev.ru 
6140Sigor@sysoev.ru     /* TODO: fail, next upstream, or bad gateway */
6150Sigor@sysoev.ru 
6161Sigor@sysoev.ru     hs->upstream->state->error_handler(task, hs, NULL);
6170Sigor@sysoev.ru }
6180Sigor@sysoev.ru 
6190Sigor@sysoev.ru 
6200Sigor@sysoev.ru static void
6210Sigor@sysoev.ru nxt_http_source_message(const char *msg, size_t len, u_char *p)
6220Sigor@sysoev.ru {
6230Sigor@sysoev.ru     if (len > NXT_MAX_ERROR_STR - 300) {
6240Sigor@sysoev.ru         len = NXT_MAX_ERROR_STR - 300;
6250Sigor@sysoev.ru         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
6260Sigor@sysoev.ru     }
6270Sigor@sysoev.ru 
6280Sigor@sysoev.ru     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
6290Sigor@sysoev.ru }
630