1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 typedef struct { 11 nxt_http_chunk_parse_t parse; 12 nxt_source_hook_t next; 13 } nxt_http_source_chunk_t; 14 15 16 static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs); 17 18 static void nxt_http_source_status_filter(nxt_task_t *task, void *obj, 19 void *data); 20 static void nxt_http_source_header_filter(nxt_task_t *task, void *obj, 21 void *data); 22 23 static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs); 24 static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, 25 nxt_name_value_t *nv); 26 static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, 27 nxt_name_value_t *nv); 28 29 static void nxt_http_source_header_ready(nxt_task_t *task, 30 nxt_http_source_t *hs, nxt_buf_t *rest); 31 static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, 32 void *data); 33 static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_http_source_body_filter(nxt_task_t *task, void *obj, 36 void *data); 37 38 static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, 39 nxt_buf_t *b); 40 static void nxt_http_source_error(nxt_task_t *task, 41 nxt_stream_source_t *stream); 42 static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs); 43 static void nxt_http_source_message(const char *msg, size_t len, u_char *p); 44 45 46 void 47 nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, 48 nxt_http_source_request_create_t request_create) 49 { 50 nxt_http_source_t *hs; 51 nxt_stream_source_t *stream; 52 53 hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t)); 54 if (nxt_slow_path(hs == NULL)) { 55 goto fail; 56 } 57 58 us->protocol_source = hs; 59 60 hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, 61 sizeof(nxt_name_value_t)); 62 if (nxt_slow_path(hs->header_in.list == NULL)) { 63 goto fail; 64 } 65 66 hs->header_in.hash = us->header_hash; 67 hs->upstream = us; 68 hs->request_create = request_create; 69 70 stream = us->stream; 71 72 if (stream == NULL) { 73 stream = nxt_mem_zalloc(us->buffers.mem_pool, 74 sizeof(nxt_stream_source_t)); 75 if (nxt_slow_path(stream == NULL)) { 76 goto fail; 77 } 78 79 us->stream = stream; 80 stream->upstream = us; 81 82 } else { 83 nxt_memzero(stream, sizeof(nxt_stream_source_t)); 84 } 85 86 /* 87 * Create the HTTP source filter chain: 88 * stream source | HTTP status line filter 89 */ 90 stream->next = &hs->query; 91 stream->error_handler = nxt_http_source_error; 92 93 hs->query.context = hs; 94 hs->query.filter = nxt_http_source_status_filter; 95 96 hs->header_in.content_length = -1; 97 98 stream->out = nxt_http_source_request_create(hs); 99 100 if (nxt_fast_path(stream->out != NULL)) { 101 nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t)); 102 103 nxt_stream_source_connect(task, stream); 104 return; 105 } 106 107 fail: 108 109 nxt_http_source_fail(task, hs); 110 } 111 112 113 nxt_inline u_char * 114 nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len) 115 { 116 u_char *s; 117 118 if (nxt_fast_path(len >= src->len)) { 119 len = src->len; 120 src->len = 0; 121 122 } else { 123 src->len -= len; 124 } 125 126 s = src->data; 127 src->data += len; 128 129 return nxt_cpymem(p, s, len); 130 } 131 132 133 static nxt_buf_t * 134 nxt_http_source_request_create(nxt_http_source_t *hs) 135 { 136 nxt_int_t ret; 137 nxt_buf_t *b, *req, **prev; 138 139 nxt_thread_log_debug("http source create request"); 140 141 prev = &req; 142 143 new_buffer: 144 145 ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0); 146 if (nxt_slow_path(ret != NXT_OK)) { 147 return NULL; 148 } 149 150 b = hs->upstream->buffers.current; 151 hs->upstream->buffers.current = NULL; 152 153 *prev = b; 154 prev = &b->next; 155 156 for ( ;; ) { 157 ret = hs->request_create(hs); 158 159 if (nxt_fast_path(ret == NXT_OK)) { 160 b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy, 161 b->mem.end - b->mem.free); 162 163 if (nxt_fast_path(hs->u.request.copy.len == 0)) { 164 continue; 165 } 166 167 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, 168 b->mem.pos); 169 170 goto new_buffer; 171 } 172 173 if (nxt_slow_path(ret == NXT_ERROR)) { 174 return NULL; 175 } 176 177 /* ret == NXT_DONE */ 178 break; 179 } 180 181 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 182 183 return req; 184 } 185 186 187 static void 188 nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data) 189 { 190 nxt_int_t ret; 191 nxt_buf_t *b; 192 nxt_http_source_t *hs; 193 194 hs = obj; 195 b = data; 196 197 /* 198 * No cycle over buffer chain is required since at 199 * start the stream source passes buffers one at a time. 200 */ 201 202 nxt_debug(task, "http source status filter"); 203 204 if (nxt_slow_path(nxt_buf_is_sync(b))) { 205 nxt_http_source_sync_buffer(task, hs, b); 206 return; 207 } 208 209 ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem); 210 211 if (nxt_fast_path(ret == NXT_OK)) { 212 /* 213 * Change the HTTP source filter chain: 214 * stream source | HTTP header filter 215 */ 216 hs->query.filter = nxt_http_source_header_filter; 217 218 nxt_debug(task, "upstream status: \"%*s\"", 219 hs->u.status_parse.end - b->mem.start, b->mem.start); 220 221 hs->header_in.status = hs->u.status_parse.code; 222 223 nxt_debug(task, "upstream version:%d status:%uD \"%*s\"", 224 hs->u.status_parse.http_version, 225 hs->u.status_parse.code, 226 hs->u.status_parse.end - hs->u.status_parse.start, 227 hs->u.status_parse.start); 228 229 nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t)); 230 hs->u.header.mem_pool = hs->upstream->buffers.mem_pool; 231 232 nxt_http_source_header_filter(task, hs, b); 233 return; 234 } 235 236 if (nxt_slow_path(ret == NXT_ERROR)) { 237 /* HTTP/0.9 response. */ 238 hs->header_in.status = 200; 239 nxt_http_source_header_ready(task, hs, b); 240 return; 241 } 242 243 /* ret == NXT_AGAIN */ 244 245 /* 246 * b->mem.pos is always equal to b->mem.end because b is a buffer 247 * which points to a response part read by the stream source. 248 * However, since the stream source is an immediate source of the 249 * status filter, b->parent is a buffer the stream source reads in. 250 */ 251 if (b->parent->mem.pos == b->parent->mem.end) { 252 nxt_http_source_message("upstream sent too long status line: \"%*s\"", 253 b->mem.pos - b->mem.start, b->mem.start); 254 255 nxt_http_source_fail(task, hs); 256 } 257 } 258 259 260 static void 261 nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data) 262 { 263 nxt_int_t ret; 264 nxt_buf_t *b; 265 nxt_http_source_t *hs; 266 267 hs = obj; 268 b = data; 269 270 /* 271 * No cycle over buffer chain is required since at 272 * start the stream source passes buffers one at a time. 273 */ 274 275 nxt_debug(task, "http source header filter"); 276 277 if (nxt_slow_path(nxt_buf_is_sync(b))) { 278 nxt_http_source_sync_buffer(task, hs, b); 279 return; 280 } 281 282 for ( ;; ) { 283 ret = nxt_http_split_header_parse(&hs->u.header, &b->mem); 284 285 if (nxt_slow_path(ret != NXT_OK)) { 286 break; 287 } 288 289 ret = nxt_http_source_header_line_process(hs); 290 291 if (nxt_slow_path(ret != NXT_OK)) { 292 break; 293 } 294 } 295 296 if (nxt_fast_path(ret == NXT_DONE)) { 297 nxt_debug(task, "http source header done"); 298 nxt_http_source_header_ready(task, hs, b); 299 return; 300 } 301 302 if (nxt_fast_path(ret == NXT_AGAIN)) { 303 return; 304 } 305 306 if (ret != NXT_ERROR) { 307 /* ret == NXT_DECLINED: "\r" is not followed by "\n" */ 308 nxt_log(task, NXT_LOG_ERR, 309 "upstream sent invalid header line: \"%*s\\r...\"", 310 hs->u.header.parse.header_end 311 - hs->u.header.parse.header_name_start, 312 hs->u.header.parse.header_name_start); 313 } 314 315 /* ret == NXT_ERROR */ 316 317 nxt_http_source_fail(task, hs); 318 } 319 320 321 static nxt_int_t 322 nxt_http_source_header_line_process(nxt_http_source_t *hs) 323 { 324 size_t name_len; 325 nxt_name_value_t *nv; 326 nxt_lvlhsh_query_t lhq; 327 nxt_http_header_parse_t *hp; 328 nxt_upstream_name_value_t *unv; 329 330 hp = &hs->u.header.parse; 331 332 name_len = hp->header_name_end - hp->header_name_start; 333 334 if (name_len > 255) { 335 nxt_http_source_message("upstream sent too long header field name: " 336 "\"%*s\"", name_len, hp->header_name_start); 337 return NXT_ERROR; 338 } 339 340 nv = nxt_list_add(hs->header_in.list); 341 if (nxt_slow_path(nv == NULL)) { 342 return NXT_ERROR; 343 } 344 345 nv->hash = hp->header_hash; 346 nv->skip = 0; 347 nv->name_len = name_len; 348 nv->name_start = hp->header_name_start; 349 nv->value_len = hp->header_end - hp->header_start; 350 nv->value_start = hp->header_start; 351 352 nxt_thread_log_debug("upstream header: \"%*s: %*s\"", 353 nv->name_len, nv->name_start, 354 nv->value_len, nv->value_start); 355 356 lhq.key_hash = nv->hash; 357 lhq.key.len = nv->name_len; 358 lhq.key.data = nv->name_start; 359 lhq.proto = &nxt_upstream_header_hash_proto; 360 361 if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) { 362 unv = lhq.value; 363 364 if (unv->handler(hs->upstream, nv) != NXT_OK) { 365 return NXT_ERROR; 366 } 367 } 368 369 return NXT_OK; 370 } 371 372 373 static const nxt_upstream_name_value_t nxt_http_source_headers[] 374 nxt_aligned(32) = 375 { 376 { nxt_http_source_content_length, 377 nxt_upstream_name_value("content-length") }, 378 379 { nxt_http_source_transfer_encoding, 380 nxt_upstream_name_value("transfer-encoding") }, 381 }; 382 383 384 nxt_int_t 385 nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh) 386 { 387 return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers, 388 nxt_nitems(nxt_http_source_headers)); 389 } 390 391 392 static nxt_int_t 393 nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv) 394 { 395 nxt_off_t length; 396 nxt_http_source_t *hs; 397 398 length = nxt_off_t_parse(nv->value_start, nv->value_len); 399 400 if (nxt_fast_path(length > 0)) { 401 hs = us->protocol_source; 402 hs->header_in.content_length = length; 403 return NXT_OK; 404 } 405 406 return NXT_ERROR; 407 } 408 409 410 static nxt_int_t 411 nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, 412 nxt_name_value_t *nv) 413 { 414 u_char *end; 415 nxt_http_source_t *hs; 416 417 end = nv->value_start + nv->value_len; 418 419 if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) { 420 hs = us->protocol_source; 421 hs->chunked = 1; 422 } 423 424 return NXT_OK; 425 } 426 427 428 static void 429 nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs, 430 nxt_buf_t *rest) 431 { 432 nxt_buf_t *b; 433 nxt_upstream_source_t *us; 434 nxt_http_source_chunk_t *hsc; 435 436 us = hs->upstream; 437 438 /* Free buffers used for request header. */ 439 440 for (b = us->stream->out; b != NULL; b = b->next) { 441 nxt_buf_pool_free(&us->buffers, b); 442 } 443 444 if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) { 445 446 if (hs->chunked) { 447 hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool, 448 sizeof(nxt_http_source_chunk_t)); 449 if (nxt_slow_path(hsc == NULL)) { 450 goto fail; 451 } 452 453 /* 454 * Change the HTTP source filter chain: 455 * stream source | chunk filter | HTTP body filter 456 */ 457 hs->query.context = hsc; 458 hs->query.filter = nxt_http_source_chunk_filter; 459 460 hsc->next.context = hs; 461 hsc->next.filter = nxt_http_source_body_filter; 462 463 hsc->parse.mem_pool = hs->upstream->buffers.mem_pool; 464 465 if (nxt_buf_mem_used_size(&rest->mem) != 0) { 466 hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest); 467 468 if (nxt_slow_path(hs->rest == NULL)) { 469 goto fail; 470 } 471 } 472 473 } else { 474 /* 475 * Change the HTTP source filter chain: 476 * stream source | HTTP body filter 477 */ 478 hs->query.filter = nxt_http_source_body_filter; 479 480 if (nxt_buf_mem_used_size(&rest->mem) != 0) { 481 hs->rest = rest; 482 } 483 } 484 485 hs->upstream->state->ready_handler(hs); 486 return; 487 } 488 489 nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each " 490 "are not enough to read upstream response", 491 us->buffers.max, us->buffers.size / 1024); 492 fail: 493 494 nxt_http_source_fail(task, hs); 495 } 496 497 498 static void 499 nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data) 500 { 501 nxt_buf_t *b; 502 nxt_http_source_t *hs; 503 nxt_http_source_chunk_t *hsc; 504 505 hsc = obj; 506 b = data; 507 508 nxt_debug(task, "http source chunk filter"); 509 510 b = nxt_http_chunk_parse(task, &hsc->parse, b); 511 512 hs = hsc->next.context; 513 514 if (hsc->parse.error) { 515 nxt_http_source_fail(task, hs); 516 return; 517 } 518 519 if (hsc->parse.chunk_error) { 520 /* Output all parsed before a chunk error and close upstream. */ 521 nxt_thread_current_work_queue_add(task->thread, 522 nxt_http_source_chunk_error, 523 task, hs, NULL); 524 } 525 526 if (b != NULL) { 527 nxt_source_filter(task->thread, hs->upstream->work_queue, task, 528 &hsc->next, b); 529 } 530 } 531 532 533 static void 534 nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data) 535 { 536 nxt_http_source_t *hs; 537 538 hs = obj; 539 540 nxt_http_source_fail(task, hs); 541 } 542 543 544 /* 545 * The HTTP source body filter accumulates first body buffers before the next 546 * filter will be established and sets completion handler for the last buffer. 547 */ 548 549 static void 550 nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data) 551 { 552 nxt_buf_t *b, *in; 553 nxt_http_source_t *hs; 554 555 hs = obj; 556 in = data; 557 558 nxt_debug(task, "http source body filter"); 559 560 for (b = in; b != NULL; b = b->next) { 561 562 if (nxt_buf_is_last(b)) { 563 b->data = hs->upstream->data; 564 b->completion_handler = hs->upstream->state->completion_handler; 565 } 566 } 567 568 if (hs->next != NULL) { 569 nxt_source_filter(task->thread, hs->upstream->work_queue, task, 570 hs->next, in); 571 return; 572 } 573 574 nxt_buf_chain_add(&hs->rest, in); 575 } 576 577 578 static void 579 nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, 580 nxt_buf_t *b) 581 { 582 if (nxt_buf_is_last(b)) { 583 nxt_log(task, NXT_LOG_ERR, 584 "upstream closed prematurely connection"); 585 586 } else { 587 nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not " 588 "enough to process upstream response header", 589 hs->upstream->buffers.max, hs->upstream->buffers.size); 590 } 591 592 /* The stream source sends only the last and the nobuf sync buffer. */ 593 594 nxt_http_source_fail(task, hs); 595 } 596 597 598 static void 599 nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream) 600 { 601 nxt_http_source_t *hs; 602 603 nxt_thread_log_debug("http source error"); 604 605 hs = stream->next->context; 606 nxt_http_source_fail(task, hs); 607 } 608 609 610 static void 611 nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs) 612 { 613 nxt_debug(task, "http source fail"); 614 615 /* TODO: fail, next upstream, or bad gateway */ 616 617 hs->upstream->state->error_handler(task, hs, NULL); 618 } 619 620 621 static void 622 nxt_http_source_message(const char *msg, size_t len, u_char *p) 623 { 624 if (len > NXT_MAX_ERROR_STR - 300) { 625 len = NXT_MAX_ERROR_STR - 300; 626 p[len++] = '.'; p[len++] = '.'; p[len++] = '.'; 627 } 628 629 nxt_thread_log_error(NXT_LOG_ERR, msg, len, p); 630 } 631