xref: /unit/src/nxt_http_source.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     nxt_http_chunk_parse_t  parse;
12     nxt_source_hook_t       next;
13 } nxt_http_source_chunk_t;
14 
15 
16 static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
17 
18 static void nxt_http_source_status_filter(nxt_thread_t *thr, void *obj,
19     void *data);
20 static void nxt_http_source_header_filter(nxt_thread_t *thr, void *obj,
21     void *data);
22 
23 static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
24 static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
25     nxt_name_value_t *nv);
26 static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
27     nxt_name_value_t *nv);
28 
29 static void nxt_http_source_header_ready(nxt_http_source_t *hs,
30     nxt_buf_t *rest);
31 static void nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj,
32     void *data);
33 static void nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj,
34     void *data);
35 static void nxt_http_source_body_filter(nxt_thread_t *thr, void *obj,
36     void *data);
37 
38 static void nxt_http_source_sync_buffer(nxt_thread_t *thr,
39     nxt_http_source_t *hs, nxt_buf_t *b);
40 static void nxt_http_source_error(nxt_stream_source_t *stream);
41 static void nxt_http_source_fail(nxt_http_source_t *hs);
42 static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
43 
44 
45 void
46 nxt_http_source_handler(nxt_upstream_source_t *us,
47     nxt_http_source_request_create_t request_create)
48 {
49     nxt_http_source_t    *hs;
50     nxt_stream_source_t  *stream;
51 
52     hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t));
53     if (nxt_slow_path(hs == NULL)) {
54         goto fail;
55     }
56 
57     us->protocol_source = hs;
58 
59     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
60                                          sizeof(nxt_name_value_t));
61     if (nxt_slow_path(hs->header_in.list == NULL)) {
62         goto fail;
63     }
64 
65     hs->header_in.hash = us->header_hash;
66     hs->upstream = us;
67     hs->request_create = request_create;
68 
69     stream = us->stream;
70 
71     if (stream == NULL) {
72         stream = nxt_mem_zalloc(us->buffers.mem_pool,
73                                 sizeof(nxt_stream_source_t));
74         if (nxt_slow_path(stream == NULL)) {
75             goto fail;
76         }
77 
78         us->stream = stream;
79         stream->upstream = us;
80 
81     } else {
82         nxt_memzero(stream, sizeof(nxt_stream_source_t));
83     }
84 
85     /*
86      * Create the HTTP source filter chain:
87      *   stream source | HTTP status line filter
88      */
89     stream->next = &hs->query;
90     stream->error_handler = nxt_http_source_error;
91 
92     hs->query.context = hs;
93     hs->query.filter = nxt_http_source_status_filter;
94 
95     hs->header_in.content_length = -1;
96 
97     stream->out = nxt_http_source_request_create(hs);
98 
99     if (nxt_fast_path(stream->out != NULL)) {
100         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
101 
102         nxt_stream_source_connect(stream);
103         return;
104     }
105 
106 fail:
107 
108     nxt_http_source_fail(hs);
109 }
110 
111 
112 nxt_inline u_char *
113 nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
114 {
115     u_char  *s;
116 
117     if (nxt_fast_path(len >= src->len)) {
118         len = src->len;
119         src->len = 0;
120 
121     } else {
122         src->len -= len;
123     }
124 
125     s = src->data;
126     src->data += len;
127 
128     return nxt_cpymem(p, s, len);
129 }
130 
131 
132 static nxt_buf_t *
133 nxt_http_source_request_create(nxt_http_source_t *hs)
134 {
135     nxt_int_t  ret;
136     nxt_buf_t  *b, *req, **prev;
137 
138     nxt_thread_log_debug("http source create request");
139 
140     prev = &req;
141 
142 new_buffer:
143 
144     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
145     if (nxt_slow_path(ret != NXT_OK)) {
146         return NULL;
147     }
148 
149     b = hs->upstream->buffers.current;
150     hs->upstream->buffers.current = NULL;
151 
152     *prev = b;
153     prev = &b->next;
154 
155     for ( ;; ) {
156         ret = hs->request_create(hs);
157 
158         if (nxt_fast_path(ret == NXT_OK)) {
159             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
160                                                b->mem.end - b->mem.free);
161 
162             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
163                 continue;
164             }
165 
166             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
167                                  b->mem.pos);
168 
169             goto new_buffer;
170         }
171 
172         if (nxt_slow_path(ret == NXT_ERROR)) {
173             return NULL;
174         }
175 
176         /* ret == NXT_DONE */
177         break;
178     }
179 
180     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
181 
182     return req;
183 }
184 
185 
186 static void
187 nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data)
188 {
189     nxt_int_t          ret;
190     nxt_buf_t          *b;
191     nxt_http_source_t  *hs;
192 
193     hs = obj;
194     b = data;
195 
196     /*
197      * No cycle over buffer chain is required since at
198      * start the stream source passes buffers one at a time.
199      */
200 
201     nxt_log_debug(thr->log, "http source status filter");
202 
203     if (nxt_slow_path(nxt_buf_is_sync(b))) {
204         nxt_http_source_sync_buffer(thr, hs, b);
205         return;
206     }
207 
208     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
209 
210     if (nxt_fast_path(ret == NXT_OK)) {
211         /*
212          * Change the HTTP source filter chain:
213          *    stream source | HTTP header filter
214          */
215         hs->query.filter = nxt_http_source_header_filter;
216 
217         nxt_log_debug(thr->log, "upstream status: \"%*s\"",
218                       hs->u.status_parse.end - b->mem.start, b->mem.start);
219 
220         hs->header_in.status = hs->u.status_parse.code;
221 
222         nxt_log_debug(thr->log, "upstream version:%d status:%uD \"%*s\"",
223                       hs->u.status_parse.http_version,
224                       hs->u.status_parse.code,
225                       hs->u.status_parse.end - hs->u.status_parse.start,
226                       hs->u.status_parse.start);
227 
228         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
229         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
230 
231         nxt_http_source_header_filter(thr, hs, b);
232         return;
233     }
234 
235     if (nxt_slow_path(ret == NXT_ERROR)) {
236         /* HTTP/0.9 response. */
237         hs->header_in.status = 200;
238         nxt_http_source_header_ready(hs, b);
239         return;
240     }
241 
242     /* ret == NXT_AGAIN */
243 
244     /*
245      * b->mem.pos is always equal to b->mem.end because b is a buffer
246      * which points to a response part read by the stream source.
247      * However, since the stream source is an immediate source of the
248      * status filter, b->parent is a buffer the stream source reads in.
249      */
250     if (b->parent->mem.pos == b->parent->mem.end) {
251         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
252                                 b->mem.pos - b->mem.start, b->mem.start);
253 
254         nxt_http_source_fail(hs);
255     }
256 }
257 
258 
259 static void
260 nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
261 {
262     nxt_int_t          ret;
263     nxt_buf_t          *b;
264     nxt_http_source_t  *hs;
265 
266     hs = obj;
267     b = data;
268 
269     /*
270      * No cycle over buffer chain is required since at
271      * start the stream source passes buffers one at a time.
272      */
273 
274     nxt_log_debug(thr->log, "http source header filter");
275 
276     if (nxt_slow_path(nxt_buf_is_sync(b))) {
277         nxt_http_source_sync_buffer(thr, hs, b);
278         return;
279     }
280 
281     for ( ;; ) {
282         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
283 
284         if (nxt_slow_path(ret != NXT_OK)) {
285             break;
286         }
287 
288         ret = nxt_http_source_header_line_process(hs);
289 
290         if (nxt_slow_path(ret != NXT_OK)) {
291             break;
292         }
293     }
294 
295     if (nxt_fast_path(ret == NXT_DONE)) {
296         nxt_log_debug(thr->log, "http source header done");
297         nxt_http_source_header_ready(hs, b);
298         return;
299     }
300 
301     if (nxt_fast_path(ret == NXT_AGAIN)) {
302         return;
303     }
304 
305     if (ret != NXT_ERROR) {
306         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
307         nxt_log_error(NXT_LOG_ERR, thr->log,
308                       "upstream sent invalid header line: \"%*s\\r...\"",
309                       hs->u.header.parse.header_end
310                           - hs->u.header.parse.header_name_start,
311                       hs->u.header.parse.header_name_start);
312     }
313 
314     /* ret == NXT_ERROR */
315 
316     nxt_http_source_fail(hs);
317 }
318 
319 
320 static nxt_int_t
321 nxt_http_source_header_line_process(nxt_http_source_t *hs)
322 {
323     size_t                     name_len;
324     nxt_name_value_t           *nv;
325     nxt_lvlhsh_query_t         lhq;
326     nxt_http_header_parse_t    *hp;
327     nxt_upstream_name_value_t  *unv;
328 
329     hp = &hs->u.header.parse;
330 
331     name_len = hp->header_name_end - hp->header_name_start;
332 
333     if (name_len > 255) {
334         nxt_http_source_message("upstream sent too long header field name: "
335                                 "\"%*s\"", name_len, hp->header_name_start);
336         return NXT_ERROR;
337     }
338 
339     nv = nxt_list_add(hs->header_in.list);
340     if (nxt_slow_path(nv == NULL)) {
341         return NXT_ERROR;
342     }
343 
344     nv->hash = hp->header_hash;
345     nv->skip = 0;
346     nv->name_len = name_len;
347     nv->name_start = hp->header_name_start;
348     nv->value_len = hp->header_end - hp->header_start;
349     nv->value_start = hp->header_start;
350 
351     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
352                          nv->name_len, nv->name_start,
353                          nv->value_len, nv->value_start);
354 
355     lhq.key_hash = nv->hash;
356     lhq.key.len = nv->name_len;
357     lhq.key.data = nv->name_start;
358     lhq.proto = &nxt_upstream_header_hash_proto;
359 
360     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
361         unv = lhq.value;
362 
363         if (unv->handler(hs->upstream, nv) != NXT_OK) {
364             return NXT_ERROR;
365         }
366     }
367 
368     return NXT_OK;
369 }
370 
371 
372 static const nxt_upstream_name_value_t  nxt_http_source_headers[]
373     nxt_aligned(32) =
374 {
375     { nxt_http_source_content_length,
376       nxt_upstream_name_value("content-length") },
377 
378     { nxt_http_source_transfer_encoding,
379       nxt_upstream_name_value("transfer-encoding") },
380 };
381 
382 
383 nxt_int_t
384 nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
385 {
386     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
387                                         nxt_nitems(nxt_http_source_headers));
388 }
389 
390 
391 static nxt_int_t
392 nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
393 {
394     nxt_off_t          length;
395     nxt_http_source_t  *hs;
396 
397     length = nxt_off_t_parse(nv->value_start, nv->value_len);
398 
399     if (nxt_fast_path(length > 0)) {
400         hs = us->protocol_source;
401         hs->header_in.content_length = length;
402         return NXT_OK;
403     }
404 
405     return NXT_ERROR;
406 }
407 
408 
409 static nxt_int_t
410 nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
411     nxt_name_value_t *nv)
412 {
413     u_char             *end;
414     nxt_http_source_t  *hs;
415 
416     end = nv->value_start + nv->value_len;
417 
418     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
419         hs = us->protocol_source;
420         hs->chunked = 1;
421     }
422 
423     return NXT_OK;
424 }
425 
426 
427 static void
428 nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest)
429 {
430     nxt_buf_t                *b;
431     nxt_upstream_source_t    *us;
432     nxt_http_source_chunk_t  *hsc;
433 
434     us = hs->upstream;
435 
436     /* Free buffers used for request header. */
437 
438     for (b = us->stream->out; b != NULL; b = b->next) {
439         nxt_buf_pool_free(&us->buffers, b);
440     }
441 
442     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
443 
444         if (hs->chunked) {
445             hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool,
446                                  sizeof(nxt_http_source_chunk_t));
447             if (nxt_slow_path(hsc == NULL)) {
448                 goto fail;
449             }
450 
451             /*
452              * Change the HTTP source filter chain:
453              *    stream source | chunk filter | HTTP body filter
454              */
455             hs->query.context = hsc;
456             hs->query.filter = nxt_http_source_chunk_filter;
457 
458             hsc->next.context = hs;
459             hsc->next.filter = nxt_http_source_body_filter;
460 
461             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
462 
463             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
464                 hs->rest = nxt_http_chunk_parse(&hsc->parse, rest);
465 
466                 if (nxt_slow_path(hs->rest == NULL)) {
467                     goto fail;
468                 }
469             }
470 
471         } else {
472             /*
473              * Change the HTTP source filter chain:
474              *    stream source | HTTP body filter
475              */
476             hs->query.filter = nxt_http_source_body_filter;
477 
478             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
479                 hs->rest = rest;
480             }
481         }
482 
483         hs->upstream->state->ready_handler(hs);
484         return;
485     }
486 
487     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
488                          "are not enough to read upstream response",
489                          us->buffers.max, us->buffers.size / 1024);
490 fail:
491 
492     nxt_http_source_fail(hs);
493 }
494 
495 
496 static void
497 nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data)
498 {
499     nxt_buf_t                *b;
500     nxt_http_source_t        *hs;
501     nxt_http_source_chunk_t  *hsc;
502 
503     hsc = obj;
504     b = data;
505 
506     nxt_log_debug(thr->log, "http source chunk filter");
507 
508     b = nxt_http_chunk_parse(&hsc->parse, b);
509 
510     hs = hsc->next.context;
511 
512     if (hsc->parse.error) {
513         nxt_http_source_fail(hs);
514         return;
515     }
516 
517     if (hsc->parse.chunk_error) {
518         /* Output all parsed before a chunk error and close upstream. */
519         nxt_thread_current_work_queue_add(thr, nxt_http_source_chunk_error,
520                                           hs, NULL, thr->log);
521     }
522 
523     if (b != NULL) {
524         nxt_source_filter(thr, hs->upstream->work_queue, &hsc->next, b);
525     }
526 }
527 
528 
529 static void
530 nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data)
531 {
532     nxt_http_source_t  *hs;
533 
534     hs = obj;
535 
536     nxt_http_source_fail(hs);
537 }
538 
539 
540 /*
541  * The HTTP source body filter accumulates first body buffers before the next
542  * filter will be established and sets completion handler for the last buffer.
543  */
544 
545 static void
546 nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
547 {
548     nxt_buf_t          *b, *in;
549     nxt_http_source_t  *hs;
550 
551     hs = obj;
552     in = data;
553 
554     nxt_log_debug(thr->log, "http source body filter");
555 
556     for (b = in; b != NULL; b = b->next) {
557 
558         if (nxt_buf_is_last(b)) {
559             b->data = hs->upstream->data;
560             b->completion_handler = hs->upstream->state->completion_handler;
561         }
562     }
563 
564     if (hs->next != NULL) {
565         nxt_source_filter(thr, hs->upstream->work_queue, hs->next, in);
566         return;
567     }
568 
569     nxt_buf_chain_add(&hs->rest, in);
570 }
571 
572 
573 static void
574 nxt_http_source_sync_buffer(nxt_thread_t *thr, nxt_http_source_t *hs,
575     nxt_buf_t *b)
576 {
577     if (nxt_buf_is_last(b)) {
578         nxt_log_error(NXT_LOG_ERR, thr->log,
579                       "upstream closed prematurely connection");
580 
581     } else {
582         nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
583                       "enough to process upstream response header",
584                       hs->upstream->buffers.max,
585                       hs->upstream->buffers.size);
586     }
587 
588     /* The stream source sends only the last and the nobuf sync buffer. */
589 
590     nxt_http_source_fail(hs);
591 }
592 
593 
594 static void
595 nxt_http_source_error(nxt_stream_source_t *stream)
596 {
597     nxt_http_source_t  *hs;
598 
599     nxt_thread_log_debug("http source error");
600 
601     hs = stream->next->context;
602     nxt_http_source_fail(hs);
603 }
604 
605 
606 static void
607 nxt_http_source_fail(nxt_http_source_t *hs)
608 {
609     nxt_thread_t  *thr;
610 
611     thr = nxt_thread();
612 
613     nxt_log_debug(thr->log, "http source fail");
614 
615     /* TODO: fail, next upstream, or bad gateway */
616 
617     hs->upstream->state->error_handler(thr, hs, NULL);
618 }
619 
620 
621 static void
622 nxt_http_source_message(const char *msg, size_t len, u_char *p)
623 {
624     if (len > NXT_MAX_ERROR_STR - 300) {
625         len = NXT_MAX_ERROR_STR - 300;
626         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
627     }
628 
629     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
630 }
631