1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #define NXT_FASTCGI_RESPONDER 1 11 #define NXT_FASTCGI_KEEP_CONN 1 12 13 14 typedef struct { 15 u_char *buf; 16 uint32_t len; 17 u_char length[4]; 18 } nxt_fastcgi_param_t; 19 20 21 #define \ 22 nxt_fastcgi_set_record_length(p, length) \ 23 do { \ 24 uint32_t len = length; \ 25 \ 26 p[1] = (u_char) len; len >>= 8; \ 27 p[0] = (u_char) len; \ 28 } while (0) 29 30 31 nxt_inline size_t 32 nxt_fastcgi_param_length(u_char *p, uint32_t length) 33 { 34 if (nxt_fast_path(length < 128)) { 35 *p = (u_char) length; 36 return 1; 37 } 38 39 p[3] = (u_char) length; length >>= 8; 40 p[2] = (u_char) length; length >>= 8; 41 p[1] = (u_char) length; length >>= 8; 42 p[0] = (u_char) (length | 0x80); 43 44 return 4; 45 } 46 47 48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs); 49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, 50 nxt_fastcgi_param_t *param); 51 52 static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, 53 void *data); 54 static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, 55 void *data); 56 static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, 57 void *data); 58 static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task, 59 nxt_fastcgi_source_t *fs, nxt_buf_t *b); 60 61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task, 62 nxt_fastcgi_source_t *fs); 63 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us, 64 nxt_name_value_t *nv); 65 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 66 nxt_name_value_t *nv); 67 68 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, 69 nxt_buf_t *b); 70 static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, 71 void *data); 72 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp); 73 static void nxt_fastcgi_source_error(nxt_task_t *task, 74 nxt_stream_source_t *stream); 75 static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs); 76 77 78 /* 79 * A FastCGI request: 80 * FCGI_BEGIN_REQUEST record; 81 * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have 82 * zero content length, 83 * Several FCGI_STDIN records, the last FCGI_STDIN record must have 84 * zero content length. 85 */ 86 87 static const uint8_t nxt_fastcgi_begin_request[] = { 88 1, /* FastCGI version. */ 89 NXT_FASTCGI_BEGIN_REQUEST, /* The BEGIN_REQUEST record type. */ 90 0, 1, /* Request ID. */ 91 0, 8, /* Content length of the Role record. */ 92 0, /* Padding length. */ 93 0, /* Reserved. */ 94 95 0, NXT_FASTCGI_RESPONDER, /* The Responder Role. */ 96 0, /* Flags. */ 97 0, 0, 0, 0, 0, /* Reserved. */ 98 }; 99 100 101 static const uint8_t nxt_fastcgi_params_record[] = { 102 1, /* FastCGI version. */ 103 NXT_FASTCGI_PARAMS, /* The PARAMS record type. */ 104 0, 1, /* Request ID. */ 105 0, 0, /* Content length. */ 106 0, /* Padding length. */ 107 0, /* Reserved. */ 108 }; 109 110 111 static const uint8_t nxt_fastcgi_stdin_record[] = { 112 1, /* FastCGI version. */ 113 NXT_FASTCGI_STDIN, /* The STDIN record type. */ 114 0, 1, /* Request ID. */ 115 0, 0, /* Content length. */ 116 0, /* Padding length. */ 117 0, /* Reserved. */ 118 }; 119 120 121 void 122 nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, 123 nxt_fastcgi_source_request_create_t request_create) 124 { 125 nxt_stream_source_t *stream; 126 nxt_fastcgi_source_t *fs; 127 128 fs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t)); 129 if (nxt_slow_path(fs == NULL)) { 130 goto fail; 131 } 132 133 us->protocol_source = fs; 134 135 fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, 136 sizeof(nxt_name_value_t)); 137 if (nxt_slow_path(fs->header_in.list == NULL)) { 138 goto fail; 139 } 140 141 fs->header_in.hash = us->header_hash; 142 fs->upstream = us; 143 fs->request_create = request_create; 144 145 stream = us->stream; 146 147 if (stream == NULL) { 148 stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t)); 149 if (nxt_slow_path(stream == NULL)) { 150 goto fail; 151 } 152 153 us->stream = stream; 154 stream->upstream = us; 155 156 } else { 157 nxt_memzero(stream, sizeof(nxt_stream_source_t)); 158 } 159 160 /* 161 * Create the FastCGI source filter chain: 162 * stream source | FastCGI record filter | FastCGI HTTP header filter 163 */ 164 stream->next = &fs->query; 165 stream->error_handler = nxt_fastcgi_source_error; 166 167 fs->record.next.context = fs; 168 fs->record.next.filter = nxt_fastcgi_source_header_filter; 169 170 fs->record.parse.last_buf = nxt_fastcgi_source_last_buf; 171 fs->record.parse.data = fs; 172 fs->record.parse.mem_pool = us->buffers.mem_pool; 173 174 fs->query.context = &fs->record.parse; 175 fs->query.filter = nxt_fastcgi_source_record_filter; 176 177 fs->header_in.content_length = -1; 178 179 stream->out = nxt_fastcgi_request_create(fs); 180 181 if (nxt_fast_path(stream->out != NULL)) { 182 nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t)); 183 fs->u.header.mem_pool = fs->upstream->buffers.mem_pool; 184 185 nxt_stream_source_connect(task, stream); 186 return; 187 } 188 189 fail: 190 191 nxt_fastcgi_source_fail(task, fs); 192 } 193 194 195 static nxt_buf_t * 196 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs) 197 { 198 u_char *p, *record_length; 199 size_t len, size, max_record_size; 200 nxt_int_t ret; 201 nxt_buf_t *b, *req, **prev; 202 nxt_bool_t begin_request; 203 nxt_fastcgi_param_t param; 204 205 nxt_thread_log_debug("fastcgi request"); 206 207 begin_request = 1; 208 param.len = 0; 209 prev = &req; 210 211 new_buffer: 212 213 ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0); 214 if (nxt_slow_path(ret != NXT_OK)) { 215 return NULL; 216 } 217 218 b = fs->upstream->buffers.current; 219 fs->upstream->buffers.current = NULL; 220 221 *prev = b; 222 prev = &b->next; 223 224 new_record: 225 226 size = b->mem.end - b->mem.free; 227 size = nxt_align_size(size, 8) - 8; 228 /* The maximal FastCGI record content size is 65535. 65528 is 64K - 8. */ 229 max_record_size = nxt_min(65528, size); 230 231 p = b->mem.free; 232 233 if (begin_request) { 234 /* TODO: fastcgi keep conn in flags. */ 235 p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16); 236 max_record_size -= 16; 237 begin_request = 0; 238 } 239 240 b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8); 241 record_length = &p[4]; 242 size = 0; 243 244 for ( ;; ) { 245 if (param.len == 0) { 246 ret = nxt_fastcgi_next_param(fs, ¶m); 247 248 if (nxt_slow_path(ret != NXT_OK)) { 249 250 if (nxt_slow_path(ret == NXT_ERROR)) { 251 return NULL; 252 } 253 254 /* ret == NXT_DONE */ 255 break; 256 } 257 } 258 259 len = max_record_size; 260 261 if (nxt_fast_path(len >= param.len)) { 262 len = param.len; 263 param.len = 0; 264 265 } else { 266 param.len -= len; 267 } 268 269 nxt_thread_log_debug("fastcgi copy len:%uz", len); 270 271 b->mem.free = nxt_cpymem(b->mem.free, param.buf, len); 272 273 size += len; 274 max_record_size -= len; 275 276 if (nxt_slow_path(param.len != 0)) { 277 /* The record is full. */ 278 279 param.buf += len; 280 281 nxt_thread_log_debug("fastcgi content size:%uz", size); 282 283 nxt_fastcgi_set_record_length(record_length, size); 284 285 /* The minimal size of aligned record with content is 16 bytes. */ 286 if (b->mem.end - b->mem.free >= 16) { 287 goto new_record; 288 } 289 290 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, 291 b->mem.pos); 292 goto new_buffer; 293 } 294 } 295 296 nxt_thread_log_debug("fastcgi content size:%uz", size); 297 298 nxt_fastcgi_set_record_length(record_length, size); 299 300 /* A padding length. */ 301 size = 8 - size % 8; 302 record_length[2] = (u_char) size; 303 nxt_memzero(b->mem.free, size); 304 b->mem.free += size; 305 306 nxt_thread_log_debug("fastcgi padding:%uz", size); 307 308 if (b->mem.end - b->mem.free < 16) { 309 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 310 311 b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0); 312 if (nxt_slow_path(b == NULL)) { 313 return NULL; 314 } 315 316 *prev = b; 317 prev = &b->next; 318 } 319 320 /* The end of FastCGI params. */ 321 p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8); 322 323 /* The end of FastCGI stdin. */ 324 b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8); 325 326 nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); 327 328 return req; 329 } 330 331 332 static nxt_int_t 333 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param) 334 { 335 nxt_int_t ret; 336 337 enum { 338 sw_name_length = 0, 339 sw_value_length, 340 sw_name, 341 sw_value, 342 }; 343 344 switch (fs->state) { 345 346 case sw_name_length: 347 ret = fs->request_create(fs); 348 349 if (nxt_slow_path(ret != NXT_OK)) { 350 return ret; 351 } 352 353 nxt_thread_log_debug("fastcgi param \"%V: %V\"", 354 &fs->u.request.name, &fs->u.request.value); 355 356 fs->state = sw_value_length; 357 param->buf = param->length; 358 param->len = nxt_fastcgi_param_length(param->length, 359 fs->u.request.name.len); 360 break; 361 362 case sw_value_length: 363 fs->state = sw_name; 364 param->buf = param->length; 365 param->len = nxt_fastcgi_param_length(param->length, 366 fs->u.request.value.len); 367 break; 368 369 case sw_name: 370 fs->state = sw_value; 371 param->buf = fs->u.request.name.data; 372 param->len = fs->u.request.name.len; 373 break; 374 375 case sw_value: 376 fs->state = sw_name_length; 377 param->buf = fs->u.request.value.data; 378 param->len = fs->u.request.value.len; 379 break; 380 } 381 382 return NXT_OK; 383 } 384 385 386 static void 387 nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data) 388 { 389 size_t size; 390 u_char *p; 391 nxt_buf_t *b, *in; 392 nxt_fastcgi_source_t *fs; 393 nxt_fastcgi_source_record_t *fsr; 394 395 fsr = obj; 396 in = data; 397 398 nxt_debug(task, "fastcgi source record filter"); 399 400 if (nxt_slow_path(fsr->parse.done)) { 401 return; 402 } 403 404 nxt_fastcgi_record_parse(task, &fsr->parse, in); 405 406 fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record); 407 408 if (fsr->parse.error) { 409 nxt_fastcgi_source_fail(task, fs); 410 return; 411 } 412 413 if (fsr->parse.fastcgi_error) { 414 /* 415 * Output all parsed before a FastCGI record error and close upstream. 416 */ 417 nxt_thread_current_work_queue_add(task->thread, 418 nxt_fastcgi_source_record_error, 419 task, fs, NULL); 420 } 421 422 /* Log FastCGI stderr output. */ 423 424 for (b = fsr->parse.out[1]; b != NULL; b = b->next) { 425 426 for (p = b->mem.free - 1; p >= b->mem.pos; p--) { 427 if (*p != NXT_CR && *p != NXT_LF) { 428 break; 429 } 430 } 431 432 size = (p + 1) - b->mem.pos; 433 434 if (size != 0) { 435 nxt_log(task, NXT_LOG_ERR, 436 "upstream sent in FastCGI stderr: \"%*s\"", 437 size, b->mem.pos); 438 } 439 440 b->completion_handler(task, b, b->parent); 441 } 442 443 /* Process FastCGI stdout output. */ 444 445 if (fsr->parse.out[0] != NULL) { 446 nxt_source_filter(task->thread, fs->upstream->work_queue, task, 447 &fsr->next, fsr->parse.out[0]); 448 } 449 } 450 451 452 static void 453 nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data) 454 { 455 nxt_fastcgi_source_t *fs; 456 457 fs = obj; 458 459 nxt_fastcgi_source_fail(task, fs); 460 } 461 462 463 static void 464 nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data) 465 { 466 nxt_int_t ret; 467 nxt_buf_t *b; 468 nxt_fastcgi_source_t *fs; 469 470 fs = obj; 471 b = data; 472 473 do { 474 nxt_debug(task, "fastcgi source header filter"); 475 476 if (nxt_slow_path(nxt_buf_is_sync(b))) { 477 nxt_fastcgi_source_sync_buffer(task, fs, b); 478 return; 479 } 480 481 for ( ;; ) { 482 ret = nxt_http_split_header_parse(&fs->u.header, &b->mem); 483 484 if (nxt_slow_path(ret != NXT_OK)) { 485 break; 486 } 487 488 ret = nxt_fastcgi_source_header_process(task, fs); 489 490 if (nxt_slow_path(ret != NXT_OK)) { 491 break; 492 } 493 } 494 495 if (nxt_fast_path(ret == NXT_DONE)) { 496 nxt_debug(task, "fastcgi source header done"); 497 nxt_fastcgi_source_header_ready(fs, b); 498 return; 499 } 500 501 if (nxt_fast_path(ret != NXT_AGAIN)) { 502 503 if (ret != NXT_ERROR) { 504 /* n == NXT_DECLINED: "\r" is not followed by "\n" */ 505 nxt_log(task, NXT_LOG_ERR, 506 "upstream sent invalid header line: \"%*s\\r...\"", 507 fs->u.header.parse.header_end 508 - fs->u.header.parse.header_name_start, 509 fs->u.header.parse.header_name_start); 510 } 511 512 /* ret == NXT_ERROR */ 513 514 nxt_fastcgi_source_fail(task, fs); 515 return; 516 } 517 518 b = b->next; 519 520 } while (b != NULL); 521 } 522 523 524 static void 525 nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs, 526 nxt_buf_t *b) 527 { 528 if (nxt_buf_is_last(b)) { 529 nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection"); 530 531 } else { 532 nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not " 533 "enough to process upstream response header", 534 fs->upstream->buffers.max, fs->upstream->buffers.size); 535 } 536 537 /* The stream source sends only the last and the nobuf sync buffer. */ 538 539 nxt_fastcgi_source_fail(task, fs); 540 } 541 542 543 static nxt_int_t 544 nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs) 545 { 546 size_t len; 547 nxt_name_value_t *nv; 548 nxt_lvlhsh_query_t lhq; 549 nxt_http_header_parse_t *hp; 550 nxt_upstream_name_value_t *unv; 551 552 hp = &fs->u.header.parse; 553 554 len = hp->header_name_end - hp->header_name_start; 555 556 if (len > 255) { 557 nxt_log(task, NXT_LOG_INFO, 558 "upstream sent too long header field name: \"%*s\"", 559 len, hp->header_name_start); 560 return NXT_ERROR; 561 } 562 563 nv = nxt_list_add(fs->header_in.list); 564 if (nxt_slow_path(nv == NULL)) { 565 return NXT_ERROR; 566 } 567 568 nv->hash = hp->header_hash; 569 nv->skip = 0; 570 nv->name_len = len; 571 nv->name_start = hp->header_name_start; 572 nv->value_len = hp->header_end - hp->header_start; 573 nv->value_start = hp->header_start; 574 575 nxt_debug(task, "http header: \"%*s: %*s\"", 576 nv->name_len, nv->name_start, nv->value_len, nv->value_start); 577 578 lhq.key_hash = nv->hash; 579 lhq.key.len = nv->name_len; 580 lhq.key.data = nv->name_start; 581 lhq.proto = &nxt_upstream_header_hash_proto; 582 583 if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) { 584 unv = lhq.value; 585 586 if (unv->handler(fs->upstream, nv) == NXT_OK) { 587 return NXT_ERROR; 588 } 589 } 590 591 return NXT_OK; 592 } 593 594 595 static const nxt_upstream_name_value_t nxt_fastcgi_source_headers[] 596 nxt_aligned(32) = 597 { 598 { nxt_fastcgi_source_status, 599 nxt_upstream_name_value("status") }, 600 601 { nxt_fastcgi_source_content_length, 602 nxt_upstream_name_value("content-length") }, 603 }; 604 605 606 nxt_int_t 607 nxt_fastcgi_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh) 608 { 609 return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers, 610 nxt_nitems(nxt_fastcgi_source_headers)); 611 } 612 613 614 static nxt_int_t 615 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv) 616 { 617 nxt_int_t n; 618 nxt_str_t s; 619 nxt_fastcgi_source_t *fs; 620 621 s.len = nv->value_len; 622 s.data = nv->value_start; 623 624 n = nxt_str_int_parse(&s); 625 626 if (nxt_fast_path(n > 0)) { 627 fs = us->protocol_source; 628 fs->header_in.status = n; 629 return NXT_OK; 630 } 631 632 return NXT_ERROR; 633 } 634 635 636 static nxt_int_t 637 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, 638 nxt_name_value_t *nv) 639 { 640 nxt_off_t length; 641 nxt_fastcgi_source_t *fs; 642 643 length = nxt_off_t_parse(nv->value_start, nv->value_len); 644 645 if (nxt_fast_path(length > 0)) { 646 fs = us->protocol_source; 647 fs->header_in.content_length = length; 648 return NXT_OK; 649 } 650 651 return NXT_ERROR; 652 } 653 654 655 static void 656 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b) 657 { 658 /* 659 * Change the FastCGI source filter chain: 660 * stream source | FastCGI record filter | FastCGI body filter 661 */ 662 fs->record.next.filter = nxt_fastcgi_source_body_filter; 663 664 if (nxt_buf_mem_used_size(&b->mem) != 0) { 665 fs->rest = b; 666 } 667 668 if (fs->header_in.status == 0) { 669 /* The "200 OK" status by default. */ 670 fs->header_in.status = 200; 671 } 672 673 fs->upstream->state->ready_handler(fs); 674 } 675 676 677 /* 678 * The FastCGI source body filter accumulates first body buffers before the next 679 * filter will be established and sets completion handler for the last buffer. 680 */ 681 682 static void 683 nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data) 684 { 685 nxt_buf_t *b, *in; 686 nxt_fastcgi_source_t *fs; 687 688 fs = obj; 689 in = data; 690 691 nxt_debug(task, "fastcgi source body filter"); 692 693 for (b = in; b != NULL; b = b->next) { 694 695 if (nxt_buf_is_last(b)) { 696 b->data = fs->upstream->data; 697 b->completion_handler = fs->upstream->state->completion_handler; 698 } 699 } 700 701 if (fs->next != NULL) { 702 nxt_source_filter(task->thread, fs->upstream->work_queue, task, 703 fs->next, in); 704 return; 705 } 706 707 nxt_buf_chain_add(&fs->rest, in); 708 } 709 710 711 static nxt_buf_t * 712 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp) 713 { 714 nxt_buf_t *b; 715 nxt_fastcgi_source_t *fs; 716 717 fs = fp->data; 718 719 b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST); 720 721 if (nxt_fast_path(b != NULL)) { 722 b->data = fs->upstream->data; 723 b->completion_handler = fs->upstream->state->completion_handler; 724 } 725 726 return b; 727 } 728 729 730 static void 731 nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream) 732 { 733 nxt_fastcgi_source_t *fs; 734 735 nxt_thread_log_debug("fastcgi source error"); 736 737 fs = stream->upstream->protocol_source; 738 739 nxt_fastcgi_source_fail(task, fs); 740 } 741 742 743 static void 744 nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs) 745 { 746 nxt_debug(task, "fastcgi source fail"); 747 748 /* TODO: fail, next upstream, or bad gateway */ 749 750 fs->upstream->state->error_handler(task, fs, NULL); 751 } 752