1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #define NXT_FASTCGI_RESPONDER 1 11 #define NXT_FASTCGI_KEEP_CONN 1 12 13 14 typedef struct { 15 u_char *buf; 16 uint32_t len; 17 u_char length[4]; 18 } nxt_fastcgi_param_t; 19 20 21 #define \ 22 nxt_fastcgi_set_record_length(p, length) \ 23 do { \ 24 uint32_t len = length; \ 25 \ 26 p[1] = (u_char) len; len >>= 8; \ 27 p[0] = (u_char) len; \ 28 } while (0) 29 30 31 nxt_inline size_t 32 nxt_fastcgi_param_length(u_char *p, uint32_t length) 33 { 34 if (nxt_fast_path(length < 128)) { 35 *p = (u_char) length; 36 return 1; 37 } 38 39 p[3] = (u_char) length; length >>= 8; 40 p[2] = (u_char) length; length >>= 8; 41 p[1] = (u_char) length; length >>= 8; 42 p[0] = (u_char) (length | 0x80); 43 44 return 4; 45 } 46 47 48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs); 49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, 50 nxt_fastcgi_param_t *param); 51 52 static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, 53 void *data); 54 static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, 55 void *data); 56 static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, 57 void *data); 58 static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task, 59 nxt_fastcgi_source_t *fs, nxt_buf_t *b); 60 61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task, 62 nxt_fastcgi_source_t *fs); 63 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us, 64 nxt_name_value_t *nv); 65 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 66 nxt_name_value_t *nv); 67 68 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, 69 nxt_buf_t *b); 70 static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, 71 void *data); 72 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp); 73 static void nxt_fastcgi_source_error(nxt_task_t *task, 74 nxt_stream_source_t *stream); 75 static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs); 76 77 78 /* 79 * A FastCGI request: 80 * FCGI_BEGIN_REQUEST record; 81 * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have 82 * zero content length, 83 * Several FCGI_STDIN records, the last FCGI_STDIN record must have 84 * zero content length. 85 */ 86 87 static const uint8_t nxt_fastcgi_begin_request[] = { 88 1, /* FastCGI version. */ 89 NXT_FASTCGI_BEGIN_REQUEST, /* The BEGIN_REQUEST record type. */ 90 0, 1, /* Request ID. */ 91 0, 8, /* Content length of the Role record. */ 92 0, /* Padding length. */ 93 0, /* Reserved. */ 94 95 0, NXT_FASTCGI_RESPONDER, /* The Responder Role. */ 96 0, /* Flags. */ 97 0, 0, 0, 0, 0, /* Reserved. */ 98 }; 99 100 101 static const uint8_t nxt_fastcgi_params_record[] = { 102 1, /* FastCGI version. */ 103 NXT_FASTCGI_PARAMS, /* The PARAMS record type. */ 104 0, 1, /* Request ID. */ 105 0, 0, /* Content length. */ 106 0, /* Padding length. */ 107 0, /* Reserved. */ 108 }; 109 110 111 static const uint8_t nxt_fastcgi_stdin_record[] = { 112 1, /* FastCGI version. */ 113 NXT_FASTCGI_STDIN, /* The STDIN record type. */ 114 0, 1, /* Request ID. */ 115 0, 0, /* Content length. */ 116 0, /* Padding length. */ 117 0, /* Reserved. */ 118 }; 119 120 121 void 122 nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, 123 nxt_fastcgi_source_request_create_t request_create) 124 { 125 nxt_stream_source_t *stream; 126 nxt_fastcgi_source_t *fs; 127 128 fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t)); 129 if (nxt_slow_path(fs == NULL)) { 130 goto fail; 131 } 132 133 us->protocol_source = fs; 134 135 fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, 136 sizeof(nxt_name_value_t)); 137 if (nxt_slow_path(fs->header_in.list == NULL)) { 138 goto fail; 139 } 140 141 fs->header_in.hash = us->header_hash; 142 fs->upstream = us; 143 fs->request_create = request_create; 144 145 stream = us->stream; 146 147 if (stream == NULL) { 148 stream = nxt_mem_zalloc(us->buffers.mem_pool, 149 sizeof(nxt_stream_source_t)); 150 if (nxt_slow_path(stream == NULL)) { 151 goto fail; 152 } 153 154 us->stream = stream; 155 stream->upstream = us; 156 157 } else { 158 nxt_memzero(stream, sizeof(nxt_stream_source_t)); 159 } 160 161 /* 162 * Create the FastCGI source filter chain: 163 * stream source | FastCGI record filter | FastCGI HTTP header filter 164 */ 165 stream->next = &fs->query; 166 stream->error_handler = nxt_fastcgi_source_error; 167 168 fs->record.next.context = fs; 169 fs->record.next.filter = nxt_fastcgi_source_header_filter; 170 171 fs->record.parse.last_buf = nxt_fastcgi_source_last_buf; 172 fs->record.parse.data = fs; 173 fs->record.parse.mem_pool = us->buffers.mem_pool; 174 175 fs->query.context = &fs->record.parse; 176 fs->query.filter = nxt_fastcgi_source_record_filter; 177 178 fs->header_in.content_length = -1; 179 180 stream->out = nxt_fastcgi_request_create(fs); 181 182 if (nxt_fast_path(stream->out != NULL)) { 183 nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t)); 184 fs->u.header.mem_pool = fs->upstream->buffers.mem_pool; 185 186 nxt_stream_source_connect(task, stream); 187 return; 188 } 189 190 fail: 191 192 nxt_fastcgi_source_fail(task, fs); 193 } 194 195 196 static nxt_buf_t * 197 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs) 198 { 199 u_char *p, *record_length; 200 size_t len, size, max_record_size; 201 nxt_int_t ret; 202 nxt_buf_t *b, *req, **prev; 203 nxt_bool_t begin_request; 204 nxt_fastcgi_param_t param; 205 206 nxt_thread_log_debug("fastcgi request"); 207 208 begin_request = 1; 209 param.len = 0; 210 prev = &req; 211 212 new_buffer: 213 214 ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0); 215 if (nxt_slow_path(ret != NXT_OK)) { 216 return NULL; 217 } 218 219 b = fs->upstream->buffers.current; 220 fs->upstream->buffers.current = NULL; 221 222 *prev = b; 223 prev = &b->next; 224 225 new_record: 226 227 size = b->mem.end - b->mem.free; 228 size = nxt_align_size(size, 8) - 8; 229 /* The maximal FastCGI record content size is 65535. 65528 is 64K - 8. */ 230 max_record_size = nxt_min(65528, size); 231 232 p = b->mem.free; 233 234 if (begin_request) { 235 /* TODO: fastcgi keep conn in flags. */ 236 p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16); 237 max_record_size -= 16; 238 begin_request = 0; 239 } 240 241 b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8); 242 record_length = &p[4]; 243 size = 0; 244 245 for ( ;; ) { 246 if (param.len == 0) { 247 ret = nxt_fastcgi_next_param(fs, ¶m); 248 249 if (nxt_slow_path(ret != NXT_OK)) { 250 251 if (nxt_slow_path(ret == NXT_ERROR)) { 252 return NULL; 253 } 254 255 /* ret == NXT_DONE */ 256 break; 257 } 258 } 259 260 len = max_record_size; 261 262 if (nxt_fast_path(len >= param.len)) { 263 len = param.len; 264 param.len = 0; 265 266 } else { 267 param.len -= len; 268 } 269 270 nxt_thread_log_debug("fastcgi copy len:%uz", len); 271 272 b->mem.free = nxt_cpymem(b->mem.free, param.buf, len); 273 274 size += len; 275 max_record_size -= len; 276 277 if (nxt_slow_path(param.len != 0)) { 278 /* The record is full. */ 279 280 param.buf += len; 281 282 nxt_thread_log_debug("fastcgi content size:%uz", size); 283 284 nxt_fastcgi_set_record_length(record_length, size); 285 286 /* The minimal size of aligned record with content is 16 bytes. */ 287 if (b->mem.end - b->mem.free >= 16) { 288 goto new_record; 289 } 290 291 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, 292 b->mem.pos); 293 goto new_buffer; 294 } 295 } 296 297 nxt_thread_log_debug("fastcgi content size:%uz", size); 298 299 nxt_fastcgi_set_record_length(record_length, size); 300 301 /* A padding length. */ 302 size = 8 - size % 8; 303 record_length[2] = (u_char) size; 304 nxt_memzero(b->mem.free, size); 305 b->mem.free += size; 306 307 nxt_thread_log_debug("fastcgi padding:%uz", size); 308 309 if (b->mem.end - b->mem.free < 16) { 310 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 311 312 b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0); 313 if (nxt_slow_path(b == NULL)) { 314 return NULL; 315 } 316 317 *prev = b; 318 prev = &b->next; 319 } 320 321 /* The end of FastCGI params. */ 322 p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8); 323 324 /* The end of FastCGI stdin. */ 325 b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8); 326 327 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 328 329 return req; 330 } 331 332 333 static nxt_int_t 334 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param) 335 { 336 nxt_int_t ret; 337 338 enum { 339 sw_name_length = 0, 340 sw_value_length, 341 sw_name, 342 sw_value, 343 }; 344 345 switch (fs->state) { 346 347 case sw_name_length: 348 ret = fs->request_create(fs); 349 350 if (nxt_slow_path(ret != NXT_OK)) { 351 return ret; 352 } 353 354 nxt_thread_log_debug("fastcgi param \"%V: %V\"", 355 &fs->u.request.name, &fs->u.request.value); 356 357 fs->state = sw_value_length; 358 param->buf = param->length; 359 param->len = nxt_fastcgi_param_length(param->length, 360 fs->u.request.name.len); 361 break; 362 363 case sw_value_length: 364 fs->state = sw_name; 365 param->buf = param->length; 366 param->len = nxt_fastcgi_param_length(param->length, 367 fs->u.request.value.len); 368 break; 369 370 case sw_name: 371 fs->state = sw_value; 372 param->buf = fs->u.request.name.data; 373 param->len = fs->u.request.name.len; 374 break; 375 376 case sw_value: 377 fs->state = sw_name_length; 378 param->buf = fs->u.request.value.data; 379 param->len = fs->u.request.value.len; 380 break; 381 } 382 383 return NXT_OK; 384 } 385 386 387 static void 388 nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data) 389 { 390 size_t size; 391 u_char *p; 392 nxt_buf_t *b, *in; 393 nxt_fastcgi_source_t *fs; 394 nxt_fastcgi_source_record_t *fsr; 395 396 fsr = obj; 397 in = data; 398 399 nxt_debug(task, "fastcgi source record filter"); 400 401 if (nxt_slow_path(fsr->parse.done)) { 402 return; 403 } 404 405 nxt_fastcgi_record_parse(task, &fsr->parse, in); 406 407 fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record); 408 409 if (fsr->parse.error) { 410 nxt_fastcgi_source_fail(task, fs); 411 return; 412 } 413 414 if (fsr->parse.fastcgi_error) { 415 /* 416 * Output all parsed before a FastCGI record error and close upstream. 417 */ 418 nxt_thread_current_work_queue_add(task->thread, 419 nxt_fastcgi_source_record_error, 420 task, fs, NULL); 421 } 422 423 /* Log FastCGI stderr output. */ 424 425 for (b = fsr->parse.out[1]; b != NULL; b = b->next) { 426 427 for (p = b->mem.free - 1; p >= b->mem.pos; p--) { 428 if (*p != NXT_CR && *p != NXT_LF) { 429 break; 430 } 431 } 432 433 size = (p + 1) - b->mem.pos; 434 435 if (size != 0) { 436 nxt_log(task, NXT_LOG_ERR, 437 "upstream sent in FastCGI stderr: \"%*s\"", 438 size, b->mem.pos); 439 } 440 441 b->completion_handler(task, b, b->parent); 442 } 443 444 /* Process FastCGI stdout output. */ 445 446 if (fsr->parse.out[0] != NULL) { 447 nxt_source_filter(task->thread, fs->upstream->work_queue, task, 448 &fsr->next, fsr->parse.out[0]); 449 } 450 } 451 452 453 static void 454 nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data) 455 { 456 nxt_fastcgi_source_t *fs; 457 458 fs = obj; 459 460 nxt_fastcgi_source_fail(task, fs); 461 } 462 463 464 static void 465 nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data) 466 { 467 nxt_int_t ret; 468 nxt_buf_t *b; 469 nxt_fastcgi_source_t *fs; 470 471 fs = obj; 472 b = data; 473 474 do { 475 nxt_debug(task, "fastcgi source header filter"); 476 477 if (nxt_slow_path(nxt_buf_is_sync(b))) { 478 nxt_fastcgi_source_sync_buffer(task, fs, b); 479 return; 480 } 481 482 for ( ;; ) { 483 ret = nxt_http_split_header_parse(&fs->u.header, &b->mem); 484 485 if (nxt_slow_path(ret != NXT_OK)) { 486 break; 487 } 488 489 ret = nxt_fastcgi_source_header_process(task, fs); 490 491 if (nxt_slow_path(ret != NXT_OK)) { 492 break; 493 } 494 } 495 496 if (nxt_fast_path(ret == NXT_DONE)) { 497 nxt_debug(task, "fastcgi source header done"); 498 nxt_fastcgi_source_header_ready(fs, b); 499 return; 500 } 501 502 if (nxt_fast_path(ret != NXT_AGAIN)) { 503 504 if (ret != NXT_ERROR) { 505 /* n == NXT_DECLINED: "\r" is not followed by "\n" */ 506 nxt_log(task, NXT_LOG_ERR, 507 "upstream sent invalid header line: \"%*s\\r...\"", 508 fs->u.header.parse.header_end 509 - fs->u.header.parse.header_name_start, 510 fs->u.header.parse.header_name_start); 511 } 512 513 /* ret == NXT_ERROR */ 514 515 nxt_fastcgi_source_fail(task, fs); 516 return; 517 } 518 519 b = b->next; 520 521 } while (b != NULL); 522 } 523 524 525 static void 526 nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs, 527 nxt_buf_t *b) 528 { 529 if (nxt_buf_is_last(b)) { 530 nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection"); 531 532 } else { 533 nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not " 534 "enough to process upstream response header", 535 fs->upstream->buffers.max, fs->upstream->buffers.size); 536 } 537 538 /* The stream source sends only the last and the nobuf sync buffer. */ 539 540 nxt_fastcgi_source_fail(task, fs); 541 } 542 543 544 static nxt_int_t 545 nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs) 546 { 547 size_t len; 548 nxt_name_value_t *nv; 549 nxt_lvlhsh_query_t lhq; 550 nxt_http_header_parse_t *hp; 551 nxt_upstream_name_value_t *unv; 552 553 hp = &fs->u.header.parse; 554 555 len = hp->header_name_end - hp->header_name_start; 556 557 if (len > 255) { 558 nxt_log(task, NXT_LOG_INFO, 559 "upstream sent too long header field name: \"%*s\"", 560 len, hp->header_name_start); 561 return NXT_ERROR; 562 } 563 564 nv = nxt_list_add(fs->header_in.list); 565 if (nxt_slow_path(nv == NULL)) { 566 return NXT_ERROR; 567 } 568 569 nv->hash = hp->header_hash; 570 nv->skip = 0; 571 nv->name_len = len; 572 nv->name_start = hp->header_name_start; 573 nv->value_len = hp->header_end - hp->header_start; 574 nv->value_start = hp->header_start; 575 576 nxt_debug(task, "http header: \"%*s: %*s\"", 577 nv->name_len, nv->name_start, nv->value_len, nv->value_start); 578 579 lhq.key_hash = nv->hash; 580 lhq.key.len = nv->name_len; 581 lhq.key.data = nv->name_start; 582 lhq.proto = &nxt_upstream_header_hash_proto; 583 584 if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) { 585 unv = lhq.value; 586 587 if (unv->handler(fs->upstream, nv) == NXT_OK) { 588 return NXT_ERROR; 589 } 590 } 591 592 return NXT_OK; 593 } 594 595 596 static const nxt_upstream_name_value_t nxt_fastcgi_source_headers[] 597 nxt_aligned(32) = 598 { 599 { nxt_fastcgi_source_status, 600 nxt_upstream_name_value("status") }, 601 602 { nxt_fastcgi_source_content_length, 603 nxt_upstream_name_value("content-length") }, 604 }; 605 606 607 nxt_int_t 608 nxt_fastcgi_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh) 609 { 610 return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers, 611 nxt_nitems(nxt_fastcgi_source_headers)); 612 } 613 614 615 static nxt_int_t 616 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv) 617 { 618 nxt_int_t n; 619 nxt_str_t s; 620 nxt_fastcgi_source_t *fs; 621 622 s.len = nv->value_len; 623 s.data = nv->value_start; 624 625 n = nxt_str_int_parse(&s); 626 627 if (nxt_fast_path(n > 0)) { 628 fs = us->protocol_source; 629 fs->header_in.status = n; 630 return NXT_OK; 631 } 632 633 return NXT_ERROR; 634 } 635 636 637 static nxt_int_t 638 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 639 nxt_name_value_t *nv) 640 { 641 nxt_off_t length; 642 nxt_fastcgi_source_t *fs; 643 644 length = nxt_off_t_parse(nv->value_start, nv->value_len); 645 646 if (nxt_fast_path(length > 0)) { 647 fs = us->protocol_source; 648 fs->header_in.content_length = length; 649 return NXT_OK; 650 } 651 652 return NXT_ERROR; 653 } 654 655 656 static void 657 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b) 658 { 659 /* 660 * Change the FastCGI source filter chain: 661 * stream source | FastCGI record filter | FastCGI body filter 662 */ 663 fs->record.next.filter = nxt_fastcgi_source_body_filter; 664 665 if (nxt_buf_mem_used_size(&b->mem) != 0) { 666 fs->rest = b; 667 } 668 669 if (fs->header_in.status == 0) { 670 /* The "200 OK" status by default. */ 671 fs->header_in.status = 200; 672 } 673 674 fs->upstream->state->ready_handler(fs); 675 } 676 677 678 /* 679 * The FastCGI source body filter accumulates first body buffers before the next 680 * filter will be established and sets completion handler for the last buffer. 681 */ 682 683 static void 684 nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data) 685 { 686 nxt_buf_t *b, *in; 687 nxt_fastcgi_source_t *fs; 688 689 fs = obj; 690 in = data; 691 692 nxt_debug(task, "fastcgi source body filter"); 693 694 for (b = in; b != NULL; b = b->next) { 695 696 if (nxt_buf_is_last(b)) { 697 b->data = fs->upstream->data; 698 b->completion_handler = fs->upstream->state->completion_handler; 699 } 700 } 701 702 if (fs->next != NULL) { 703 nxt_source_filter(task->thread, fs->upstream->work_queue, task, 704 fs->next, in); 705 return; 706 } 707 708 nxt_buf_chain_add(&fs->rest, in); 709 } 710 711 712 static nxt_buf_t * 713 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp) 714 { 715 nxt_buf_t *b; 716 nxt_fastcgi_source_t *fs; 717 718 fs = fp->data; 719 720 b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST); 721 722 if (nxt_fast_path(b != NULL)) { 723 b->data = fs->upstream->data; 724 b->completion_handler = fs->upstream->state->completion_handler; 725 } 726 727 return b; 728 } 729 730 731 static void 732 nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream) 733 { 734 nxt_fastcgi_source_t *fs; 735 736 nxt_thread_log_debug("fastcgi source error"); 737 738 fs = stream->upstream->protocol_source; 739 740 nxt_fastcgi_source_fail(task, fs); 741 } 742 743 744 static void 745 nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs) 746 { 747 nxt_debug(task, "fastcgi source fail"); 748 749 /* TODO: fail, next upstream, or bad gateway */ 750 751 fs->upstream->state->error_handler(task, fs, NULL); 752 } 753