1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #define NXT_FASTCGI_RESPONDER 1 11 #define NXT_FASTCGI_KEEP_CONN 1 12 13 14 typedef struct { 15 u_char *buf; 16 uint32_t len; 17 u_char length[4]; 18 } nxt_fastcgi_param_t; 19 20 21 #define \ 22 nxt_fastcgi_set_record_length(p, length) \ 23 do { \ 24 uint32_t len = length; \ 25 \ 26 p[1] = (u_char) len; len >>= 8; \ 27 p[0] = (u_char) len; \ 28 } while (0) 29 30 31 nxt_inline size_t 32 nxt_fastcgi_param_length(u_char *p, uint32_t length) 33 { 34 if (nxt_fast_path(length < 128)) { 35 *p = (u_char) length; 36 return 1; 37 } 38 39 p[3] = (u_char) length; length >>= 8; 40 p[2] = (u_char) length; length >>= 8; 41 p[1] = (u_char) length; length >>= 8; 42 p[0] = (u_char) (length | 0x80); 43 44 return 4; 45 } 46 47 48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs); 49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, 50 nxt_fastcgi_param_t *param); 51 52 static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, 53 void *data); 54 static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, 55 void *data); 56 static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, 57 void *data); 58 static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, 59 nxt_fastcgi_source_t *fs, nxt_buf_t *b); 60 61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs); 62 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us, 63 nxt_name_value_t *nv); 64 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 65 nxt_name_value_t *nv); 66 67 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, 68 nxt_buf_t *b); 69 static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, 70 void *data); 71 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp); 72 static void nxt_fastcgi_source_error(nxt_stream_source_t *stream); 73 static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs); 74 75 76 /* 77 * A FastCGI request: 78 * FCGI_BEGIN_REQUEST record; 79 * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have 80 * zero content length, 81 * Several FCGI_STDIN records, the last FCGI_STDIN record must have 82 * zero content length. 83 */ 84 85 static const uint8_t nxt_fastcgi_begin_request[] = { 86 1, /* FastCGI version. */ 87 NXT_FASTCGI_BEGIN_REQUEST, /* The BEGIN_REQUEST record type. */ 88 0, 1, /* Request ID. */ 89 0, 8, /* Content length of the Role record. */ 90 0, /* Padding length. */ 91 0, /* Reserved. */ 92 93 0, NXT_FASTCGI_RESPONDER, /* The Responder Role. */ 94 0, /* Flags. */ 95 0, 0, 0, 0, 0, /* Reserved. */ 96 }; 97 98 99 static const uint8_t nxt_fastcgi_params_record[] = { 100 1, /* FastCGI version. */ 101 NXT_FASTCGI_PARAMS, /* The PARAMS record type. */ 102 0, 1, /* Request ID. */ 103 0, 0, /* Content length. */ 104 0, /* Padding length. */ 105 0, /* Reserved. */ 106 }; 107 108 109 static const uint8_t nxt_fastcgi_stdin_record[] = { 110 1, /* FastCGI version. */ 111 NXT_FASTCGI_STDIN, /* The STDIN record type. */ 112 0, 1, /* Request ID. */ 113 0, 0, /* Content length. */ 114 0, /* Padding length. */ 115 0, /* Reserved. */ 116 }; 117 118 119 void 120 nxt_fastcgi_source_handler(nxt_upstream_source_t *us, 121 nxt_fastcgi_source_request_create_t request_create) 122 { 123 nxt_stream_source_t *stream; 124 nxt_fastcgi_source_t *fs; 125 126 fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t)); 127 if (nxt_slow_path(fs == NULL)) { 128 goto fail; 129 } 130 131 us->protocol_source = fs; 132 133 fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, 134 sizeof(nxt_name_value_t)); 135 if (nxt_slow_path(fs->header_in.list == NULL)) { 136 goto fail; 137 } 138 139 fs->header_in.hash = us->header_hash; 140 fs->upstream = us; 141 fs->request_create = request_create; 142 143 stream = us->stream; 144 145 if (stream == NULL) { 146 stream = nxt_mem_zalloc(us->buffers.mem_pool, 147 sizeof(nxt_stream_source_t)); 148 if (nxt_slow_path(stream == NULL)) { 149 goto fail; 150 } 151 152 us->stream = stream; 153 stream->upstream = us; 154 155 } else { 156 nxt_memzero(stream, sizeof(nxt_stream_source_t)); 157 } 158 159 /* 160 * Create the FastCGI source filter chain: 161 * stream source | FastCGI record filter | FastCGI HTTP header filter 162 */ 163 stream->next = &fs->query; 164 stream->error_handler = nxt_fastcgi_source_error; 165 166 fs->record.next.context = fs; 167 fs->record.next.filter = nxt_fastcgi_source_header_filter; 168 169 fs->record.parse.last_buf = nxt_fastcgi_source_last_buf; 170 fs->record.parse.data = fs; 171 fs->record.parse.mem_pool = us->buffers.mem_pool; 172 173 fs->query.context = &fs->record.parse; 174 fs->query.filter = nxt_fastcgi_source_record_filter; 175 176 fs->header_in.content_length = -1; 177 178 stream->out = nxt_fastcgi_request_create(fs); 179 180 if (nxt_fast_path(stream->out != NULL)) { 181 nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t)); 182 fs->u.header.mem_pool = fs->upstream->buffers.mem_pool; 183 184 nxt_stream_source_connect(stream); 185 return; 186 } 187 188 fail: 189 190 nxt_fastcgi_source_fail(fs); 191 } 192 193 194 static nxt_buf_t * 195 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs) 196 { 197 u_char *p, *record_length; 198 size_t len, size, max_record_size; 199 nxt_int_t ret; 200 nxt_buf_t *b, *req, **prev; 201 nxt_bool_t begin_request; 202 nxt_fastcgi_param_t param; 203 204 nxt_thread_log_debug("fastcgi request"); 205 206 begin_request = 1; 207 param.len = 0; 208 prev = &req; 209 210 new_buffer: 211 212 ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0); 213 if (nxt_slow_path(ret != NXT_OK)) { 214 return NULL; 215 } 216 217 b = fs->upstream->buffers.current; 218 fs->upstream->buffers.current = NULL; 219 220 *prev = b; 221 prev = &b->next; 222 223 new_record: 224 225 size = b->mem.end - b->mem.free; 226 size = nxt_align_size(size, 8) - 8; 227 /* The maximal FastCGI record content size is 65535. 65528 is 64K - 8. */ 228 max_record_size = nxt_min(65528, size); 229 230 p = b->mem.free; 231 232 if (begin_request) { 233 /* TODO: fastcgi keep conn in flags. */ 234 p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16); 235 max_record_size -= 16; 236 begin_request = 0; 237 } 238 239 b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8); 240 record_length = &p[4]; 241 size = 0; 242 243 for ( ;; ) { 244 if (param.len == 0) { 245 ret = nxt_fastcgi_next_param(fs, ¶m); 246 247 if (nxt_slow_path(ret != NXT_OK)) { 248 249 if (nxt_slow_path(ret == NXT_ERROR)) { 250 return NULL; 251 } 252 253 /* ret == NXT_DONE */ 254 break; 255 } 256 } 257 258 len = max_record_size; 259 260 if (nxt_fast_path(len >= param.len)) { 261 len = param.len; 262 param.len = 0; 263 264 } else { 265 param.len -= len; 266 } 267 268 nxt_thread_log_debug("fastcgi copy len:%uz", len); 269 270 b->mem.free = nxt_cpymem(b->mem.free, param.buf, len); 271 272 size += len; 273 max_record_size -= len; 274 275 if (nxt_slow_path(param.len != 0)) { 276 /* The record is full. */ 277 278 param.buf += len; 279 280 nxt_thread_log_debug("fastcgi content size:%uz", size); 281 282 nxt_fastcgi_set_record_length(record_length, size); 283 284 /* The minimal size of aligned record with content is 16 bytes. */ 285 if (b->mem.end - b->mem.free >= 16) { 286 goto new_record; 287 } 288 289 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, 290 b->mem.pos); 291 goto new_buffer; 292 } 293 } 294 295 nxt_thread_log_debug("fastcgi content size:%uz", size); 296 297 nxt_fastcgi_set_record_length(record_length, size); 298 299 /* A padding length. */ 300 size = 8 - size % 8; 301 record_length[2] = (u_char) size; 302 nxt_memzero(b->mem.free, size); 303 b->mem.free += size; 304 305 nxt_thread_log_debug("fastcgi padding:%uz", size); 306 307 if (b->mem.end - b->mem.free < 16) { 308 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 309 310 b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0); 311 if (nxt_slow_path(b == NULL)) { 312 return NULL; 313 } 314 315 *prev = b; 316 prev = &b->next; 317 } 318 319 /* The end of FastCGI params. */ 320 p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8); 321 322 /* The end of FastCGI stdin. */ 323 b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8); 324 325 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 326 327 return req; 328 } 329 330 331 static nxt_int_t 332 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param) 333 { 334 nxt_int_t ret; 335 336 enum { 337 sw_name_length = 0, 338 sw_value_length, 339 sw_name, 340 sw_value, 341 }; 342 343 switch (fs->state) { 344 345 case sw_name_length: 346 ret = fs->request_create(fs); 347 348 if (nxt_slow_path(ret != NXT_OK)) { 349 return ret; 350 } 351 352 nxt_thread_log_debug("fastcgi param \"%V: %V\"", 353 &fs->u.request.name, &fs->u.request.value); 354 355 fs->state = sw_value_length; 356 param->buf = param->length; 357 param->len = nxt_fastcgi_param_length(param->length, 358 fs->u.request.name.len); 359 break; 360 361 case sw_value_length: 362 fs->state = sw_name; 363 param->buf = param->length; 364 param->len = nxt_fastcgi_param_length(param->length, 365 fs->u.request.value.len); 366 break; 367 368 case sw_name: 369 fs->state = sw_value; 370 param->buf = fs->u.request.name.data; 371 param->len = fs->u.request.name.len; 372 break; 373 374 case sw_value: 375 fs->state = sw_name_length; 376 param->buf = fs->u.request.value.data; 377 param->len = fs->u.request.value.len; 378 break; 379 } 380 381 return NXT_OK; 382 } 383 384 385 static void 386 nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data) 387 { 388 size_t size; 389 u_char *p; 390 nxt_buf_t *b, *in; 391 nxt_fastcgi_source_t *fs; 392 nxt_fastcgi_source_record_t *fsr; 393 394 fsr = obj; 395 in = data; 396 397 nxt_log_debug(thr->log, "fastcgi source record filter"); 398 399 if (nxt_slow_path(fsr->parse.done)) { 400 return; 401 } 402 403 nxt_fastcgi_record_parse(&fsr->parse, in); 404 405 fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record); 406 407 if (fsr->parse.error) { 408 nxt_fastcgi_source_fail(fs); 409 return; 410 } 411 412 if (fsr->parse.fastcgi_error) { 413 /* 414 * Output all parsed before a FastCGI record error and close upstream. 415 */ 416 nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error, 417 fs, NULL, thr->log); 418 } 419 420 /* Log FastCGI stderr output. */ 421 422 for (b = fsr->parse.out[1]; b != NULL; b = b->next) { 423 424 for (p = b->mem.free - 1; p >= b->mem.pos; p--) { 425 if (*p != NXT_CR && *p != NXT_LF) { 426 break; 427 } 428 } 429 430 size = (p + 1) - b->mem.pos; 431 432 if (size != 0) { 433 nxt_log_error(NXT_LOG_ERR, thr->log, 434 "upstream sent in FastCGI stderr: \"%*s\"", 435 size, b->mem.pos); 436 } 437 438 b->completion_handler(thr, b, b->parent); 439 } 440 441 /* Process FastCGI stdout output. */ 442 443 if (fsr->parse.out[0] != NULL) { 444 nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next, 445 fsr->parse.out[0]); 446 } 447 } 448 449 450 static void 451 nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data) 452 { 453 nxt_fastcgi_source_t *fs; 454 455 fs = obj; 456 457 nxt_fastcgi_source_fail(fs); 458 } 459 460 461 static void 462 nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) 463 { 464 nxt_int_t ret; 465 nxt_buf_t *b; 466 nxt_fastcgi_source_t *fs; 467 468 fs = obj; 469 b = data; 470 471 do { 472 nxt_log_debug(thr->log, "fastcgi source header filter"); 473 474 if (nxt_slow_path(nxt_buf_is_sync(b))) { 475 nxt_fastcgi_source_sync_buffer(thr, fs, b); 476 return; 477 } 478 479 for ( ;; ) { 480 ret = nxt_http_split_header_parse(&fs->u.header, &b->mem); 481 482 if (nxt_slow_path(ret != NXT_OK)) { 483 break; 484 } 485 486 ret = nxt_fastcgi_source_header_process(fs); 487 488 if (nxt_slow_path(ret != NXT_OK)) { 489 break; 490 } 491 } 492 493 if (nxt_fast_path(ret == NXT_DONE)) { 494 nxt_log_debug(thr->log, "fastcgi source header done"); 495 nxt_fastcgi_source_header_ready(fs, b); 496 return; 497 } 498 499 if (nxt_fast_path(ret != NXT_AGAIN)) { 500 501 if (ret != NXT_ERROR) { 502 /* n == NXT_DECLINED: "\r" is not followed by "\n" */ 503 nxt_log_error(NXT_LOG_ERR, thr->log, 504 "upstream sent invalid header line: \"%*s\\r...\"", 505 fs->u.header.parse.header_end 506 - fs->u.header.parse.header_name_start, 507 fs->u.header.parse.header_name_start); 508 } 509 510 /* ret == NXT_ERROR */ 511 512 nxt_fastcgi_source_fail(fs); 513 return; 514 } 515 516 b = b->next; 517 518 } while (b != NULL); 519 } 520 521 522 static void 523 nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs, 524 nxt_buf_t *b) 525 { 526 if (nxt_buf_is_last(b)) { 527 nxt_log_error(NXT_LOG_ERR, thr->log, 528 "upstream closed prematurely connection"); 529 530 } else { 531 nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not " 532 "enough to process upstream response header", 533 fs->upstream->buffers.max, 534 fs->upstream->buffers.size); 535 } 536 537 /* The stream source sends only the last and the nobuf sync buffer. */ 538 539 nxt_fastcgi_source_fail(fs); 540 } 541 542 543 static nxt_int_t 544 nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs) 545 { 546 size_t len; 547 nxt_thread_t *thr; 548 nxt_name_value_t *nv; 549 nxt_lvlhsh_query_t lhq; 550 nxt_http_header_parse_t *hp; 551 nxt_upstream_name_value_t *unv; 552 553 thr = nxt_thread(); 554 hp = &fs->u.header.parse; 555 556 len = hp->header_name_end - hp->header_name_start; 557 558 if (len > 255) { 559 nxt_log_error(NXT_LOG_INFO, thr->log, 560 "upstream sent too long header field name: \"%*s\"", 561 len, hp->header_name_start); 562 return NXT_ERROR; 563 } 564 565 nv = nxt_list_add(fs->header_in.list); 566 if (nxt_slow_path(nv == NULL)) { 567 return NXT_ERROR; 568 } 569 570 nv->hash = hp->header_hash; 571 nv->skip = 0; 572 nv->name_len = len; 573 nv->name_start = hp->header_name_start; 574 nv->value_len = hp->header_end - hp->header_start; 575 nv->value_start = hp->header_start; 576 577 nxt_log_debug(thr->log, "http header: \"%*s: %*s\"", 578 nv->name_len, nv->name_start, nv->value_len, nv->value_start); 579 580 lhq.key_hash = nv->hash; 581 lhq.key.len = nv->name_len; 582 lhq.key.data = nv->name_start; 583 lhq.proto = &nxt_upstream_header_hash_proto; 584 585 if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) { 586 unv = lhq.value; 587 588 if (unv->handler(fs->upstream, nv) == NXT_OK) { 589 return NXT_ERROR; 590 } 591 } 592 593 return NXT_OK; 594 } 595 596 597 static const nxt_upstream_name_value_t nxt_fastcgi_source_headers[] 598 nxt_aligned(32) = 599 { 600 { nxt_fastcgi_source_status, 601 nxt_upstream_name_value("status") }, 602 603 { nxt_fastcgi_source_content_length, 604 nxt_upstream_name_value("content-length") }, 605 }; 606 607 608 nxt_int_t 609 nxt_fastcgi_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh) 610 { 611 return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers, 612 nxt_nitems(nxt_fastcgi_source_headers)); 613 } 614 615 616 static nxt_int_t 617 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv) 618 { 619 nxt_int_t n; 620 nxt_str_t s; 621 nxt_fastcgi_source_t *fs; 622 623 s.len = nv->value_len; 624 s.data = nv->value_start; 625 626 n = nxt_str_int_parse(&s); 627 628 if (nxt_fast_path(n > 0)) { 629 fs = us->protocol_source; 630 fs->header_in.status = n; 631 return NXT_OK; 632 } 633 634 return NXT_ERROR; 635 } 636 637 638 static nxt_int_t 639 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 640 nxt_name_value_t *nv) 641 { 642 nxt_off_t length; 643 nxt_fastcgi_source_t *fs; 644 645 length = nxt_off_t_parse(nv->value_start, nv->value_len); 646 647 if (nxt_fast_path(length > 0)) { 648 fs = us->protocol_source; 649 fs->header_in.content_length = length; 650 return NXT_OK; 651 } 652 653 return NXT_ERROR; 654 } 655 656 657 static void 658 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b) 659 { 660 /* 661 * Change the FastCGI source filter chain: 662 * stream source | FastCGI record filter | FastCGI body filter 663 */ 664 fs->record.next.filter = nxt_fastcgi_source_body_filter; 665 666 if (nxt_buf_mem_used_size(&b->mem) != 0) { 667 fs->rest = b; 668 } 669 670 if (fs->header_in.status == 0) { 671 /* The "200 OK" status by default. */ 672 fs->header_in.status = 200; 673 } 674 675 fs->upstream->state->ready_handler(fs); 676 } 677 678 679 /* 680 * The FastCGI source body filter accumulates first body buffers before the next 681 * filter will be established and sets completion handler for the last buffer. 682 */ 683 684 static void 685 nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data) 686 { 687 nxt_buf_t *b, *in; 688 nxt_fastcgi_source_t *fs; 689 690 fs = obj; 691 in = data; 692 693 nxt_log_debug(thr->log, "fastcgi source body filter"); 694 695 for (b = in; b != NULL; b = b->next) { 696 697 if (nxt_buf_is_last(b)) { 698 b->data = fs->upstream->data; 699 b->completion_handler = fs->upstream->state->completion_handler; 700 } 701 } 702 703 if (fs->next != NULL) { 704 nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in); 705 return; 706 } 707 708 nxt_buf_chain_add(&fs->rest, in); 709 } 710 711 712 static nxt_buf_t * 713 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp) 714 { 715 nxt_buf_t *b; 716 nxt_fastcgi_source_t *fs; 717 718 fs = fp->data; 719 720 b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST); 721 722 if (nxt_fast_path(b != NULL)) { 723 b->data = fs->upstream->data; 724 b->completion_handler = fs->upstream->state->completion_handler; 725 } 726 727 return b; 728 } 729 730 731 static void 732 nxt_fastcgi_source_error(nxt_stream_source_t *stream) 733 { 734 nxt_fastcgi_source_t *fs; 735 736 nxt_thread_log_debug("fastcgi source error"); 737 738 fs = stream->upstream->protocol_source; 739 740 nxt_fastcgi_source_fail(fs); 741 } 742 743 744 static void 745 nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs) 746 { 747 nxt_thread_t *thr; 748 749 thr = nxt_thread(); 750 751 nxt_log_debug(thr->log, "fastcgi source fail"); 752 753 /* TODO: fail, next upstream, or bad gateway */ 754 755 fs->upstream->state->error_handler(thr, fs, NULL); 756 } 757