1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 typedef struct {
11 nxt_http_chunk_parse_t parse;
12 nxt_source_hook_t next;
13 } nxt_http_source_chunk_t;
14
15
16 static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
17
18 static void nxt_http_source_status_filter(nxt_task_t *task, void *obj,
19 void *data);
20 static void nxt_http_source_header_filter(nxt_task_t *task, void *obj,
21 void *data);
22
23 static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
24 static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
25 nxt_name_value_t *nv);
26 static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
27 nxt_name_value_t *nv);
28
29 static void nxt_http_source_header_ready(nxt_task_t *task,
30 nxt_http_source_t *hs, nxt_buf_t *rest);
31 static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj,
32 void *data);
33 static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj,
34 void *data);
35 static void nxt_http_source_body_filter(nxt_task_t *task, void *obj,
36 void *data);
37
38 static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
39 nxt_buf_t *b);
40 static void nxt_http_source_error(nxt_task_t *task,
41 nxt_stream_source_t *stream);
42 static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs);
43 static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
44
45
46 void
nxt_http_source_handler(nxt_task_t * task,nxt_upstream_source_t * us,nxt_http_source_request_create_t request_create)47 nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
48 nxt_http_source_request_create_t request_create)
49 {
50 nxt_http_source_t *hs;
51 nxt_stream_source_t *stream;
52
53 hs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_http_source_t));
54 if (nxt_slow_path(hs == NULL)) {
55 goto fail;
56 }
57
58 us->protocol_source = hs;
59
60 hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
61 sizeof(nxt_name_value_t));
62 if (nxt_slow_path(hs->header_in.list == NULL)) {
63 goto fail;
64 }
65
66 hs->header_in.hash = us->header_hash;
67 hs->upstream = us;
68 hs->request_create = request_create;
69
70 stream = us->stream;
71
72 if (stream == NULL) {
73 stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t));
74 if (nxt_slow_path(stream == NULL)) {
75 goto fail;
76 }
77
78 us->stream = stream;
79 stream->upstream = us;
80
81 } else {
82 nxt_memzero(stream, sizeof(nxt_stream_source_t));
83 }
84
85 /*
86 * Create the HTTP source filter chain:
87 * stream source | HTTP status line filter
88 */
89 stream->next = &hs->query;
90 stream->error_handler = nxt_http_source_error;
91
92 hs->query.context = hs;
93 hs->query.filter = nxt_http_source_status_filter;
94
95 hs->header_in.content_length = -1;
96
97 stream->out = nxt_http_source_request_create(hs);
98
99 if (nxt_fast_path(stream->out != NULL)) {
100 nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
101
102 nxt_stream_source_connect(task, stream);
103 return;
104 }
105
106 fail:
107
108 nxt_http_source_fail(task, hs);
109 }
110
111
112 nxt_inline u_char *
nxt_http_source_copy(u_char * p,nxt_str_t * src,size_t len)113 nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
114 {
115 u_char *s;
116
117 if (nxt_fast_path(len >= src->len)) {
118 len = src->len;
119 src->len = 0;
120
121 } else {
122 src->len -= len;
123 }
124
125 s = src->data;
126 src->data += len;
127
128 return nxt_cpymem(p, s, len);
129 }
130
131
132 static nxt_buf_t *
nxt_http_source_request_create(nxt_http_source_t * hs)133 nxt_http_source_request_create(nxt_http_source_t *hs)
134 {
135 nxt_int_t ret;
136 nxt_buf_t *b, *req, **prev;
137
138 nxt_thread_log_debug("http source create request");
139
140 prev = &req;
141
142 new_buffer:
143
144 ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
145 if (nxt_slow_path(ret != NXT_OK)) {
146 return NULL;
147 }
148
149 b = hs->upstream->buffers.current;
150 hs->upstream->buffers.current = NULL;
151
152 *prev = b;
153 prev = &b->next;
154
155 for ( ;; ) {
156 ret = hs->request_create(hs);
157
158 if (nxt_fast_path(ret == NXT_OK)) {
159 b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
160 b->mem.end - b->mem.free);
161
162 if (nxt_fast_path(hs->u.request.copy.len == 0)) {
163 continue;
164 }
165
166 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
167 b->mem.pos);
168
169 goto new_buffer;
170 }
171
172 if (nxt_slow_path(ret == NXT_ERROR)) {
173 return NULL;
174 }
175
176 /* ret == NXT_DONE */
177 break;
178 }
179
180 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
181
182 return req;
183 }
184
185
186 static void
nxt_http_source_status_filter(nxt_task_t * task,void * obj,void * data)187 nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data)
188 {
189 nxt_int_t ret;
190 nxt_buf_t *b;
191 nxt_http_source_t *hs;
192
193 hs = obj;
194 b = data;
195
196 /*
197 * No cycle over buffer chain is required since at
198 * start the stream source passes buffers one at a time.
199 */
200
201 nxt_debug(task, "http source status filter");
202
203 if (nxt_slow_path(nxt_buf_is_sync(b))) {
204 nxt_http_source_sync_buffer(task, hs, b);
205 return;
206 }
207
208 ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
209
210 if (nxt_fast_path(ret == NXT_OK)) {
211 /*
212 * Change the HTTP source filter chain:
213 * stream source | HTTP header filter
214 */
215 hs->query.filter = nxt_http_source_header_filter;
216
217 nxt_debug(task, "upstream status: \"%*s\"",
218 hs->u.status_parse.end - b->mem.start, b->mem.start);
219
220 hs->header_in.status = hs->u.status_parse.code;
221
222 nxt_debug(task, "upstream version:%d status:%uD \"%*s\"",
223 hs->u.status_parse.http_version,
224 hs->u.status_parse.code,
225 hs->u.status_parse.end - hs->u.status_parse.start,
226 hs->u.status_parse.start);
227
228 nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
229 hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
230
231 nxt_http_source_header_filter(task, hs, b);
232 return;
233 }
234
235 if (nxt_slow_path(ret == NXT_ERROR)) {
236 /* HTTP/0.9 response. */
237 hs->header_in.status = 200;
238 nxt_http_source_header_ready(task, hs, b);
239 return;
240 }
241
242 /* ret == NXT_AGAIN */
243
244 /*
245 * b->mem.pos is always equal to b->mem.end because b is a buffer
246 * which points to a response part read by the stream source.
247 * However, since the stream source is an immediate source of the
248 * status filter, b->parent is a buffer the stream source reads in.
249 */
250 if (b->parent->mem.pos == b->parent->mem.end) {
251 nxt_http_source_message("upstream sent too long status line: \"%*s\"",
252 b->mem.pos - b->mem.start, b->mem.start);
253
254 nxt_http_source_fail(task, hs);
255 }
256 }
257
258
259 static void
nxt_http_source_header_filter(nxt_task_t * task,void * obj,void * data)260 nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data)
261 {
262 nxt_int_t ret;
263 nxt_buf_t *b;
264 nxt_http_source_t *hs;
265
266 hs = obj;
267 b = data;
268
269 /*
270 * No cycle over buffer chain is required since at
271 * start the stream source passes buffers one at a time.
272 */
273
274 nxt_debug(task, "http source header filter");
275
276 if (nxt_slow_path(nxt_buf_is_sync(b))) {
277 nxt_http_source_sync_buffer(task, hs, b);
278 return;
279 }
280
281 for ( ;; ) {
282 ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
283
284 if (nxt_slow_path(ret != NXT_OK)) {
285 break;
286 }
287
288 ret = nxt_http_source_header_line_process(hs);
289
290 if (nxt_slow_path(ret != NXT_OK)) {
291 break;
292 }
293 }
294
295 if (nxt_fast_path(ret == NXT_DONE)) {
296 nxt_debug(task, "http source header done");
297 nxt_http_source_header_ready(task, hs, b);
298 return;
299 }
300
301 if (nxt_fast_path(ret == NXT_AGAIN)) {
302 return;
303 }
304
305 if (ret != NXT_ERROR) {
306 /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
307 nxt_log(task, NXT_LOG_ERR,
308 "upstream sent invalid header line: \"%*s\\r...\"",
309 hs->u.header.parse.header_end
310 - hs->u.header.parse.header_name_start,
311 hs->u.header.parse.header_name_start);
312 }
313
314 /* ret == NXT_ERROR */
315
316 nxt_http_source_fail(task, hs);
317 }
318
319
320 static nxt_int_t
nxt_http_source_header_line_process(nxt_http_source_t * hs)321 nxt_http_source_header_line_process(nxt_http_source_t *hs)
322 {
323 size_t name_len;
324 nxt_name_value_t *nv;
325 nxt_lvlhsh_query_t lhq;
326 nxt_http_header_parse_t *hp;
327 nxt_upstream_name_value_t *unv;
328
329 hp = &hs->u.header.parse;
330
331 name_len = hp->header_name_end - hp->header_name_start;
332
333 if (name_len > 255) {
334 nxt_http_source_message("upstream sent too long header field name: "
335 "\"%*s\"", name_len, hp->header_name_start);
336 return NXT_ERROR;
337 }
338
339 nv = nxt_list_add(hs->header_in.list);
340 if (nxt_slow_path(nv == NULL)) {
341 return NXT_ERROR;
342 }
343
344 nv->hash = hp->header_hash;
345 nv->skip = 0;
346 nv->name_len = name_len;
347 nv->name_start = hp->header_name_start;
348 nv->value_len = hp->header_end - hp->header_start;
349 nv->value_start = hp->header_start;
350
351 nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
352 nv->name_len, nv->name_start,
353 nv->value_len, nv->value_start);
354
355 lhq.key_hash = nv->hash;
356 lhq.key.len = nv->name_len;
357 lhq.key.data = nv->name_start;
358 lhq.proto = &nxt_upstream_header_hash_proto;
359
360 if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
361 unv = lhq.value;
362
363 if (unv->handler(hs->upstream, nv) != NXT_OK) {
364 return NXT_ERROR;
365 }
366 }
367
368 return NXT_OK;
369 }
370
371
372 static const nxt_upstream_name_value_t nxt_http_source_headers[]
373 nxt_aligned(32) =
374 {
375 { nxt_http_source_content_length,
376 nxt_upstream_name_value("content-length") },
377
378 { nxt_http_source_transfer_encoding,
379 nxt_upstream_name_value("transfer-encoding") },
380 };
381
382
383 nxt_int_t
nxt_http_source_hash_create(nxt_mp_t * mp,nxt_lvlhsh_t * lh)384 nxt_http_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh)
385 {
386 return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
387 nxt_nitems(nxt_http_source_headers));
388 }
389
390
391 static nxt_int_t
nxt_http_source_content_length(nxt_upstream_source_t * us,nxt_name_value_t * nv)392 nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
393 {
394 nxt_off_t length;
395 nxt_http_source_t *hs;
396
397 length = nxt_off_t_parse(nv->value_start, nv->value_len);
398
399 if (nxt_fast_path(length > 0)) {
400 hs = us->protocol_source;
401 hs->header_in.content_length = length;
402 return NXT_OK;
403 }
404
405 return NXT_ERROR;
406 }
407
408
409 static nxt_int_t
nxt_http_source_transfer_encoding(nxt_upstream_source_t * us,nxt_name_value_t * nv)410 nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
411 nxt_name_value_t *nv)
412 {
413 u_char *end;
414 nxt_http_source_t *hs;
415
416 end = nv->value_start + nv->value_len;
417
418 if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
419 hs = us->protocol_source;
420 hs->chunked = 1;
421 }
422
423 return NXT_OK;
424 }
425
426
427 static void
nxt_http_source_header_ready(nxt_task_t * task,nxt_http_source_t * hs,nxt_buf_t * rest)428 nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs,
429 nxt_buf_t *rest)
430 {
431 nxt_buf_t *b;
432 nxt_upstream_source_t *us;
433 nxt_http_source_chunk_t *hsc;
434
435 us = hs->upstream;
436
437 /* Free buffers used for request header. */
438
439 for (b = us->stream->out; b != NULL; b = b->next) {
440 nxt_buf_pool_free(&us->buffers, b);
441 }
442
443 if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
444
445 if (hs->chunked) {
446 hsc = nxt_mp_zalloc(hs->upstream->buffers.mem_pool,
447 sizeof(nxt_http_source_chunk_t));
448 if (nxt_slow_path(hsc == NULL)) {
449 goto fail;
450 }
451
452 /*
453 * Change the HTTP source filter chain:
454 * stream source | chunk filter | HTTP body filter
455 */
456 hs->query.context = hsc;
457 hs->query.filter = nxt_http_source_chunk_filter;
458
459 hsc->next.context = hs;
460 hsc->next.filter = nxt_http_source_body_filter;
461
462 hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
463
464 if (nxt_buf_mem_used_size(&rest->mem) != 0) {
465 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest);
466
467 if (nxt_slow_path(hs->rest == NULL)) {
468 goto fail;
469 }
470 }
471
472 } else {
473 /*
474 * Change the HTTP source filter chain:
475 * stream source | HTTP body filter
476 */
477 hs->query.filter = nxt_http_source_body_filter;
478
479 if (nxt_buf_mem_used_size(&rest->mem) != 0) {
480 hs->rest = rest;
481 }
482 }
483
484 hs->upstream->state->ready_handler(hs);
485 return;
486 }
487
488 nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
489 "are not enough to read upstream response",
490 us->buffers.max, us->buffers.size / 1024);
491 fail:
492
493 nxt_http_source_fail(task, hs);
494 }
495
496
497 static void
nxt_http_source_chunk_filter(nxt_task_t * task,void * obj,void * data)498 nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data)
499 {
500 nxt_buf_t *b;
501 nxt_http_source_t *hs;
502 nxt_http_source_chunk_t *hsc;
503
504 hsc = obj;
505 b = data;
506
507 nxt_debug(task, "http source chunk filter");
508
509 b = nxt_http_chunk_parse(task, &hsc->parse, b);
510
511 hs = hsc->next.context;
512
513 if (hsc->parse.error) {
514 nxt_http_source_fail(task, hs);
515 return;
516 }
517
518 if (hsc->parse.chunk_error) {
519 /* Output all parsed before a chunk error and close upstream. */
520 nxt_thread_current_work_queue_add(task->thread,
521 nxt_http_source_chunk_error,
522 task, hs, NULL);
523 }
524
525 if (b != NULL) {
526 nxt_source_filter(task->thread, hs->upstream->work_queue, task,
527 &hsc->next, b);
528 }
529 }
530
531
532 static void
nxt_http_source_chunk_error(nxt_task_t * task,void * obj,void * data)533 nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data)
534 {
535 nxt_http_source_t *hs;
536
537 hs = obj;
538
539 nxt_http_source_fail(task, hs);
540 }
541
542
543 /*
544 * The HTTP source body filter accumulates first body buffers before the next
545 * filter will be established and sets completion handler for the last buffer.
546 */
547
548 static void
nxt_http_source_body_filter(nxt_task_t * task,void * obj,void * data)549 nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data)
550 {
551 nxt_buf_t *b, *in;
552 nxt_http_source_t *hs;
553
554 hs = obj;
555 in = data;
556
557 nxt_debug(task, "http source body filter");
558
559 for (b = in; b != NULL; b = b->next) {
560
561 if (nxt_buf_is_last(b)) {
562 b->data = hs->upstream->data;
563 b->completion_handler = hs->upstream->state->completion_handler;
564 }
565 }
566
567 if (hs->next != NULL) {
568 nxt_source_filter(task->thread, hs->upstream->work_queue, task,
569 hs->next, in);
570 return;
571 }
572
573 nxt_buf_chain_add(&hs->rest, in);
574 }
575
576
577 static void
nxt_http_source_sync_buffer(nxt_task_t * task,nxt_http_source_t * hs,nxt_buf_t * b)578 nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs,
579 nxt_buf_t *b)
580 {
581 if (nxt_buf_is_last(b)) {
582 nxt_log(task, NXT_LOG_ERR,
583 "upstream closed prematurely connection");
584
585 } else {
586 nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not "
587 "enough to process upstream response header",
588 hs->upstream->buffers.max, hs->upstream->buffers.size);
589 }
590
591 /* The stream source sends only the last and the nobuf sync buffer. */
592
593 nxt_http_source_fail(task, hs);
594 }
595
596
597 static void
nxt_http_source_error(nxt_task_t * task,nxt_stream_source_t * stream)598 nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
599 {
600 nxt_http_source_t *hs;
601
602 nxt_thread_log_debug("http source error");
603
604 hs = stream->next->context;
605 nxt_http_source_fail(task, hs);
606 }
607
608
609 static void
nxt_http_source_fail(nxt_task_t * task,nxt_http_source_t * hs)610 nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs)
611 {
612 nxt_debug(task, "http source fail");
613
614 /* TODO: fail, next upstream, or bad gateway */
615
616 hs->upstream->state->error_handler(task, hs, NULL);
617 }
618
619
620 static void
nxt_http_source_message(const char * msg,size_t len,u_char * p)621 nxt_http_source_message(const char *msg, size_t len, u_char *p)
622 {
623 if (len > NXT_MAX_ERROR_STR - 300) {
624 len = NXT_MAX_ERROR_STR - 300;
625 p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
626 }
627
628 nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
629 }
630