xref: /unit/src/nxt_http_source.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     nxt_http_chunk_parse_t  parse;
12     nxt_source_hook_t       next;
13 } nxt_http_source_chunk_t;
14 
15 
16 static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
17 
18 static void nxt_http_source_status_filter(nxt_task_t *task, void *obj,
19     void *data);
20 static void nxt_http_source_header_filter(nxt_task_t *task, void *obj,
21     void *data);
22 
23 static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
24 static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
25     nxt_name_value_t *nv);
26 static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
27     nxt_name_value_t *nv);
28 
29 static void nxt_http_source_header_ready(nxt_task_t *task,
30     nxt_http_source_t *hs, nxt_buf_t *rest);
31 static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_http_source_body_filter(nxt_task_t *task, void *obj,
36     void *data);
37 
38 static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
39     nxt_buf_t *b);
40 static void nxt_http_source_error(nxt_task_t *task,
41     nxt_stream_source_t *stream);
42 static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs);
43 static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
44 
45 
46 void
nxt_http_source_handler(nxt_task_t * task,nxt_upstream_source_t * us,nxt_http_source_request_create_t request_create)47 nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
48     nxt_http_source_request_create_t request_create)
49 {
50     nxt_http_source_t    *hs;
51     nxt_stream_source_t  *stream;
52 
53     hs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_http_source_t));
54     if (nxt_slow_path(hs == NULL)) {
55         goto fail;
56     }
57 
58     us->protocol_source = hs;
59 
60     hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
61                                          sizeof(nxt_name_value_t));
62     if (nxt_slow_path(hs->header_in.list == NULL)) {
63         goto fail;
64     }
65 
66     hs->header_in.hash = us->header_hash;
67     hs->upstream = us;
68     hs->request_create = request_create;
69 
70     stream = us->stream;
71 
72     if (stream == NULL) {
73         stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t));
74         if (nxt_slow_path(stream == NULL)) {
75             goto fail;
76         }
77 
78         us->stream = stream;
79         stream->upstream = us;
80 
81     } else {
82         nxt_memzero(stream, sizeof(nxt_stream_source_t));
83     }
84 
85     /*
86      * Create the HTTP source filter chain:
87      *   stream source | HTTP status line filter
88      */
89     stream->next = &hs->query;
90     stream->error_handler = nxt_http_source_error;
91 
92     hs->query.context = hs;
93     hs->query.filter = nxt_http_source_status_filter;
94 
95     hs->header_in.content_length = -1;
96 
97     stream->out = nxt_http_source_request_create(hs);
98 
99     if (nxt_fast_path(stream->out != NULL)) {
100         nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
101 
102         nxt_stream_source_connect(task, stream);
103         return;
104     }
105 
106 fail:
107 
108     nxt_http_source_fail(task, hs);
109 }
110 
111 
112 nxt_inline u_char *
nxt_http_source_copy(u_char * p,nxt_str_t * src,size_t len)113 nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
114 {
115     u_char  *s;
116 
117     if (nxt_fast_path(len >= src->len)) {
118         len = src->len;
119         src->len = 0;
120 
121     } else {
122         src->len -= len;
123     }
124 
125     s = src->data;
126     src->data += len;
127 
128     return nxt_cpymem(p, s, len);
129 }
130 
131 
132 static nxt_buf_t *
nxt_http_source_request_create(nxt_http_source_t * hs)133 nxt_http_source_request_create(nxt_http_source_t *hs)
134 {
135     nxt_int_t  ret;
136     nxt_buf_t  *b, *req, **prev;
137 
138     nxt_thread_log_debug("http source create request");
139 
140     prev = &req;
141 
142 new_buffer:
143 
144     ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
145     if (nxt_slow_path(ret != NXT_OK)) {
146         return NULL;
147     }
148 
149     b = hs->upstream->buffers.current;
150     hs->upstream->buffers.current = NULL;
151 
152     *prev = b;
153     prev = &b->next;
154 
155     for ( ;; ) {
156         ret = hs->request_create(hs);
157 
158         if (nxt_fast_path(ret == NXT_OK)) {
159             b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
160                                                b->mem.end - b->mem.free);
161 
162             if (nxt_fast_path(hs->u.request.copy.len == 0)) {
163                 continue;
164             }
165 
166             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
167                                  b->mem.pos);
168 
169             goto new_buffer;
170         }
171 
172         if (nxt_slow_path(ret == NXT_ERROR)) {
173             return NULL;
174         }
175 
176         /* ret == NXT_DONE */
177         break;
178     }
179 
180     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
181 
182     return req;
183 }
184 
185 
186 static void
nxt_http_source_status_filter(nxt_task_t * task,void * obj,void * data)187 nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data)
188 {
189     nxt_int_t          ret;
190     nxt_buf_t          *b;
191     nxt_http_source_t  *hs;
192 
193     hs = obj;
194     b = data;
195 
196     /*
197      * No cycle over buffer chain is required since at
198      * start the stream source passes buffers one at a time.
199      */
200 
201     nxt_debug(task, "http source status filter");
202 
203     if (nxt_slow_path(nxt_buf_is_sync(b))) {
204         nxt_http_source_sync_buffer(task, hs, b);
205         return;
206     }
207 
208     ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
209 
210     if (nxt_fast_path(ret == NXT_OK)) {
211         /*
212          * Change the HTTP source filter chain:
213          *    stream source | HTTP header filter
214          */
215         hs->query.filter = nxt_http_source_header_filter;
216 
217         nxt_debug(task, "upstream status: \"%*s\"",
218                   hs->u.status_parse.end - b->mem.start, b->mem.start);
219 
220         hs->header_in.status = hs->u.status_parse.code;
221 
222         nxt_debug(task, "upstream version:%d status:%uD \"%*s\"",
223                   hs->u.status_parse.http_version,
224                   hs->u.status_parse.code,
225                   hs->u.status_parse.end - hs->u.status_parse.start,
226                   hs->u.status_parse.start);
227 
228         nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
229         hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
230 
231         nxt_http_source_header_filter(task, hs, b);
232         return;
233     }
234 
235     if (nxt_slow_path(ret == NXT_ERROR)) {
236         /* HTTP/0.9 response. */
237         hs->header_in.status = 200;
238         nxt_http_source_header_ready(task, hs, b);
239         return;
240     }
241 
242     /* ret == NXT_AGAIN */
243 
244     /*
245      * b->mem.pos is always equal to b->mem.end because b is a buffer
246      * which points to a response part read by the stream source.
247      * However, since the stream source is an immediate source of the
248      * status filter, b->parent is a buffer the stream source reads in.
249      */
250     if (b->parent->mem.pos == b->parent->mem.end) {
251         nxt_http_source_message("upstream sent too long status line: \"%*s\"",
252                                 b->mem.pos - b->mem.start, b->mem.start);
253 
254         nxt_http_source_fail(task, hs);
255     }
256 }
257 
258 
259 static void
nxt_http_source_header_filter(nxt_task_t * task,void * obj,void * data)260 nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data)
261 {
262     nxt_int_t          ret;
263     nxt_buf_t          *b;
264     nxt_http_source_t  *hs;
265 
266     hs = obj;
267     b = data;
268 
269     /*
270      * No cycle over buffer chain is required since at
271      * start the stream source passes buffers one at a time.
272      */
273 
274     nxt_debug(task, "http source header filter");
275 
276     if (nxt_slow_path(nxt_buf_is_sync(b))) {
277         nxt_http_source_sync_buffer(task, hs, b);
278         return;
279     }
280 
281     for ( ;; ) {
282         ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
283 
284         if (nxt_slow_path(ret != NXT_OK)) {
285             break;
286         }
287 
288         ret = nxt_http_source_header_line_process(hs);
289 
290         if (nxt_slow_path(ret != NXT_OK)) {
291             break;
292         }
293     }
294 
295     if (nxt_fast_path(ret == NXT_DONE)) {
296         nxt_debug(task, "http source header done");
297         nxt_http_source_header_ready(task, hs, b);
298         return;
299     }
300 
301     if (nxt_fast_path(ret == NXT_AGAIN)) {
302         return;
303     }
304 
305     if (ret != NXT_ERROR) {
306         /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
307         nxt_log(task, NXT_LOG_ERR,
308                 "upstream sent invalid header line: \"%*s\\r...\"",
309                 hs->u.header.parse.header_end
310                     - hs->u.header.parse.header_name_start,
311                 hs->u.header.parse.header_name_start);
312     }
313 
314     /* ret == NXT_ERROR */
315 
316     nxt_http_source_fail(task, hs);
317 }
318 
319 
320 static nxt_int_t
nxt_http_source_header_line_process(nxt_http_source_t * hs)321 nxt_http_source_header_line_process(nxt_http_source_t *hs)
322 {
323     size_t                     name_len;
324     nxt_name_value_t           *nv;
325     nxt_lvlhsh_query_t         lhq;
326     nxt_http_header_parse_t    *hp;
327     nxt_upstream_name_value_t  *unv;
328 
329     hp = &hs->u.header.parse;
330 
331     name_len = hp->header_name_end - hp->header_name_start;
332 
333     if (name_len > 255) {
334         nxt_http_source_message("upstream sent too long header field name: "
335                                 "\"%*s\"", name_len, hp->header_name_start);
336         return NXT_ERROR;
337     }
338 
339     nv = nxt_list_add(hs->header_in.list);
340     if (nxt_slow_path(nv == NULL)) {
341         return NXT_ERROR;
342     }
343 
344     nv->hash = hp->header_hash;
345     nv->skip = 0;
346     nv->name_len = name_len;
347     nv->name_start = hp->header_name_start;
348     nv->value_len = hp->header_end - hp->header_start;
349     nv->value_start = hp->header_start;
350 
351     nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
352                          nv->name_len, nv->name_start,
353                          nv->value_len, nv->value_start);
354 
355     lhq.key_hash = nv->hash;
356     lhq.key.len = nv->name_len;
357     lhq.key.data = nv->name_start;
358     lhq.proto = &nxt_upstream_header_hash_proto;
359 
360     if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
361         unv = lhq.value;
362 
363         if (unv->handler(hs->upstream, nv) != NXT_OK) {
364             return NXT_ERROR;
365         }
366     }
367 
368     return NXT_OK;
369 }
370 
371 
372 static const nxt_upstream_name_value_t  nxt_http_source_headers[]
373     nxt_aligned(32) =
374 {
375     { nxt_http_source_content_length,
376       nxt_upstream_name_value("content-length") },
377 
378     { nxt_http_source_transfer_encoding,
379       nxt_upstream_name_value("transfer-encoding") },
380 };
381 
382 
383 nxt_int_t
nxt_http_source_hash_create(nxt_mp_t * mp,nxt_lvlhsh_t * lh)384 nxt_http_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh)
385 {
386     return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
387                                         nxt_nitems(nxt_http_source_headers));
388 }
389 
390 
391 static nxt_int_t
nxt_http_source_content_length(nxt_upstream_source_t * us,nxt_name_value_t * nv)392 nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
393 {
394     nxt_off_t          length;
395     nxt_http_source_t  *hs;
396 
397     length = nxt_off_t_parse(nv->value_start, nv->value_len);
398 
399     if (nxt_fast_path(length > 0)) {
400         hs = us->protocol_source;
401         hs->header_in.content_length = length;
402         return NXT_OK;
403     }
404 
405     return NXT_ERROR;
406 }
407 
408 
409 static nxt_int_t
nxt_http_source_transfer_encoding(nxt_upstream_source_t * us,nxt_name_value_t * nv)410 nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
411     nxt_name_value_t *nv)
412 {
413     u_char             *end;
414     nxt_http_source_t  *hs;
415 
416     end = nv->value_start + nv->value_len;
417 
418     if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
419         hs = us->protocol_source;
420         hs->chunked = 1;
421     }
422 
423     return NXT_OK;
424 }
425 
426 
427 static void
nxt_http_source_header_ready(nxt_task_t * task,nxt_http_source_t * hs,nxt_buf_t * rest)428 nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs,
429     nxt_buf_t *rest)
430 {
431     nxt_buf_t                *b;
432     nxt_upstream_source_t    *us;
433     nxt_http_source_chunk_t  *hsc;
434 
435     us = hs->upstream;
436 
437     /* Free buffers used for request header. */
438 
439     for (b = us->stream->out; b != NULL; b = b->next) {
440         nxt_buf_pool_free(&us->buffers, b);
441     }
442 
443     if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
444 
445         if (hs->chunked) {
446             hsc = nxt_mp_zalloc(hs->upstream->buffers.mem_pool,
447                                 sizeof(nxt_http_source_chunk_t));
448             if (nxt_slow_path(hsc == NULL)) {
449                 goto fail;
450             }
451 
452             /*
453              * Change the HTTP source filter chain:
454              *    stream source | chunk filter | HTTP body filter
455              */
456             hs->query.context = hsc;
457             hs->query.filter = nxt_http_source_chunk_filter;
458 
459             hsc->next.context = hs;
460             hsc->next.filter = nxt_http_source_body_filter;
461 
462             hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
463 
464             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
465                 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest);
466 
467                 if (nxt_slow_path(hs->rest == NULL)) {
468                     goto fail;
469                 }
470             }
471 
472         } else {
473             /*
474              * Change the HTTP source filter chain:
475              *    stream source | HTTP body filter
476              */
477             hs->query.filter = nxt_http_source_body_filter;
478 
479             if (nxt_buf_mem_used_size(&rest->mem) != 0) {
480                 hs->rest = rest;
481             }
482         }
483 
484         hs->upstream->state->ready_handler(hs);
485         return;
486     }
487 
488     nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
489                          "are not enough to read upstream response",
490                          us->buffers.max, us->buffers.size / 1024);
491 fail:
492 
493     nxt_http_source_fail(task, hs);
494 }
495 
496 
497 static void
nxt_http_source_chunk_filter(nxt_task_t * task,void * obj,void * data)498 nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data)
499 {
500     nxt_buf_t                *b;
501     nxt_http_source_t        *hs;
502     nxt_http_source_chunk_t  *hsc;
503 
504     hsc = obj;
505     b = data;
506 
507     nxt_debug(task, "http source chunk filter");
508 
509     b = nxt_http_chunk_parse(task, &hsc->parse, b);
510 
511     hs = hsc->next.context;
512 
513     if (hsc->parse.error) {
514         nxt_http_source_fail(task, hs);
515         return;
516     }
517 
518     if (hsc->parse.chunk_error) {
519         /* Output all parsed before a chunk error and close upstream. */
520         nxt_thread_current_work_queue_add(task->thread,
521                                           nxt_http_source_chunk_error,
522                                           task, hs, NULL);
523     }
524 
525     if (b != NULL) {
526         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
527                           &hsc->next, b);
528     }
529 }
530 
531 
532 static void
nxt_http_source_chunk_error(nxt_task_t * task,void * obj,void * data)533 nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data)
534 {
535     nxt_http_source_t  *hs;
536 
537     hs = obj;
538 
539     nxt_http_source_fail(task, hs);
540 }
541 
542 
543 /*
544  * The HTTP source body filter accumulates first body buffers before the next
545  * filter will be established and sets completion handler for the last buffer.
546  */
547 
548 static void
nxt_http_source_body_filter(nxt_task_t * task,void * obj,void * data)549 nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data)
550 {
551     nxt_buf_t          *b, *in;
552     nxt_http_source_t  *hs;
553 
554     hs = obj;
555     in = data;
556 
557     nxt_debug(task, "http source body filter");
558 
559     for (b = in; b != NULL; b = b->next) {
560 
561         if (nxt_buf_is_last(b)) {
562             b->data = hs->upstream->data;
563             b->completion_handler = hs->upstream->state->completion_handler;
564         }
565     }
566 
567     if (hs->next != NULL) {
568         nxt_source_filter(task->thread, hs->upstream->work_queue, task,
569                           hs->next, in);
570         return;
571     }
572 
573     nxt_buf_chain_add(&hs->rest, in);
574 }
575 
576 
577 static void
nxt_http_source_sync_buffer(nxt_task_t * task,nxt_http_source_t * hs,nxt_buf_t * b)578 nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
579     nxt_buf_t *b)
580 {
581     if (nxt_buf_is_last(b)) {
582         nxt_log(task, NXT_LOG_ERR,
583                 "upstream closed prematurely connection");
584 
585     } else {
586         nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not "
587                 "enough to process upstream response header",
588                 hs->upstream->buffers.max, hs->upstream->buffers.size);
589     }
590 
591     /* The stream source sends only the last and the nobuf sync buffer. */
592 
593     nxt_http_source_fail(task, hs);
594 }
595 
596 
597 static void
nxt_http_source_error(nxt_task_t * task,nxt_stream_source_t * stream)598 nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
599 {
600     nxt_http_source_t  *hs;
601 
602     nxt_thread_log_debug("http source error");
603 
604     hs = stream->next->context;
605     nxt_http_source_fail(task, hs);
606 }
607 
608 
609 static void
nxt_http_source_fail(nxt_task_t * task,nxt_http_source_t * hs)610 nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs)
611 {
612     nxt_debug(task, "http source fail");
613 
614     /* TODO: fail, next upstream, or bad gateway */
615 
616     hs->upstream->state->error_handler(task, hs, NULL);
617 }
618 
619 
620 static void
nxt_http_source_message(const char * msg,size_t len,u_char * p)621 nxt_http_source_message(const char *msg, size_t len, u_char *p)
622 {
623     if (len > NXT_MAX_ERROR_STR - 300) {
624         len = NXT_MAX_ERROR_STR - 300;
625         p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
626     }
627 
628     nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
629 }
630