nxt_fastcgi_source.c (0:a63ceefd6ab0) nxt_fastcgi_source.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 35 unchanged lines hidden (view full) ---

44 return 4;
45}
46
47
48static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
49static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
50 nxt_fastcgi_param_t *param);
51
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 35 unchanged lines hidden (view full) ---

44 return 4;
45}
46
47
48static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
49static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
50 nxt_fastcgi_param_t *param);
51
52static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj,
52static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
53 void *data);
53 void *data);
54static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj,
54static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
55 void *data);
55 void *data);
56static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj,
56static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
57 void *data);
57 void *data);
58static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr,
58static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
59 nxt_fastcgi_source_t *fs, nxt_buf_t *b);
60
59 nxt_fastcgi_source_t *fs, nxt_buf_t *b);
60
61static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs);
61static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
62 nxt_fastcgi_source_t *fs);
62static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
63 nxt_name_value_t *nv);
64static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
65 nxt_name_value_t *nv);
66
67static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
68 nxt_buf_t *b);
63static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
64 nxt_name_value_t *nv);
65static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
66 nxt_name_value_t *nv);
67
68static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
69 nxt_buf_t *b);
69static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj,
70static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
70 void *data);
71static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
71 void *data);
72static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
72static void nxt_fastcgi_source_error(nxt_stream_source_t *stream);
73static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs);
73static void nxt_fastcgi_source_error(nxt_task_t *task,
74 nxt_stream_source_t *stream);
75static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
74
75
76/*
77 * A FastCGI request:
78 * FCGI_BEGIN_REQUEST record;
79 * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
80 * zero content length,
81 * Several FCGI_STDIN records, the last FCGI_STDIN record must have

--- 30 unchanged lines hidden (view full) ---

112 0, 1, /* Request ID. */
113 0, 0, /* Content length. */
114 0, /* Padding length. */
115 0, /* Reserved. */
116};
117
118
119void
76
77
78/*
79 * A FastCGI request:
80 * FCGI_BEGIN_REQUEST record;
81 * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
82 * zero content length,
83 * Several FCGI_STDIN records, the last FCGI_STDIN record must have

--- 30 unchanged lines hidden (view full) ---

114 0, 1, /* Request ID. */
115 0, 0, /* Content length. */
116 0, /* Padding length. */
117 0, /* Reserved. */
118};
119
120
121void
120nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
122nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
121 nxt_fastcgi_source_request_create_t request_create)
122{
123 nxt_stream_source_t *stream;
124 nxt_fastcgi_source_t *fs;
125
126 fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
127 if (nxt_slow_path(fs == NULL)) {
128 goto fail;

--- 47 unchanged lines hidden (view full) ---

176 fs->header_in.content_length = -1;
177
178 stream->out = nxt_fastcgi_request_create(fs);
179
180 if (nxt_fast_path(stream->out != NULL)) {
181 nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
182 fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
183
123 nxt_fastcgi_source_request_create_t request_create)
124{
125 nxt_stream_source_t *stream;
126 nxt_fastcgi_source_t *fs;
127
128 fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
129 if (nxt_slow_path(fs == NULL)) {
130 goto fail;

--- 47 unchanged lines hidden (view full) ---

178 fs->header_in.content_length = -1;
179
180 stream->out = nxt_fastcgi_request_create(fs);
181
182 if (nxt_fast_path(stream->out != NULL)) {
183 nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
184 fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
185
184 nxt_stream_source_connect(stream);
186 nxt_stream_source_connect(task, stream);
185 return;
186 }
187
188fail:
189
187 return;
188 }
189
190fail:
191
190 nxt_fastcgi_source_fail(fs);
192 nxt_fastcgi_source_fail(task, fs);
191}
192
193
194static nxt_buf_t *
195nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
196{
197 u_char *p, *record_length;
198 size_t len, size, max_record_size;

--- 179 unchanged lines hidden (view full) ---

378 break;
379 }
380
381 return NXT_OK;
382}
383
384
385static void
193}
194
195
196static nxt_buf_t *
197nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
198{
199 u_char *p, *record_length;
200 size_t len, size, max_record_size;

--- 179 unchanged lines hidden (view full) ---

380 break;
381 }
382
383 return NXT_OK;
384}
385
386
387static void
386nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
388nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
387{
388 size_t size;
389 u_char *p;
390 nxt_buf_t *b, *in;
391 nxt_fastcgi_source_t *fs;
392 nxt_fastcgi_source_record_t *fsr;
393
394 fsr = obj;
395 in = data;
396
389{
390 size_t size;
391 u_char *p;
392 nxt_buf_t *b, *in;
393 nxt_fastcgi_source_t *fs;
394 nxt_fastcgi_source_record_t *fsr;
395
396 fsr = obj;
397 in = data;
398
397 nxt_log_debug(thr->log, "fastcgi source record filter");
399 nxt_debug(task, "fastcgi source record filter");
398
399 if (nxt_slow_path(fsr->parse.done)) {
400 return;
401 }
402
400
401 if (nxt_slow_path(fsr->parse.done)) {
402 return;
403 }
404
403 nxt_fastcgi_record_parse(&fsr->parse, in);
405 nxt_fastcgi_record_parse(task, &fsr->parse, in);
404
405 fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
406
407 if (fsr->parse.error) {
406
407 fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
408
409 if (fsr->parse.error) {
408 nxt_fastcgi_source_fail(fs);
410 nxt_fastcgi_source_fail(task, fs);
409 return;
410 }
411
412 if (fsr->parse.fastcgi_error) {
413 /*
414 * Output all parsed before a FastCGI record error and close upstream.
415 */
411 return;
412 }
413
414 if (fsr->parse.fastcgi_error) {
415 /*
416 * Output all parsed before a FastCGI record error and close upstream.
417 */
416 nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error,
417 fs, NULL, thr->log);
418 nxt_thread_current_work_queue_add(task->thread,
419 nxt_fastcgi_source_record_error,
420 task, fs, NULL);
418 }
419
420 /* Log FastCGI stderr output. */
421
422 for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
423
424 for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
425 if (*p != NXT_CR && *p != NXT_LF) {
426 break;
427 }
428 }
429
430 size = (p + 1) - b->mem.pos;
431
432 if (size != 0) {
421 }
422
423 /* Log FastCGI stderr output. */
424
425 for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
426
427 for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
428 if (*p != NXT_CR && *p != NXT_LF) {
429 break;
430 }
431 }
432
433 size = (p + 1) - b->mem.pos;
434
435 if (size != 0) {
433 nxt_log_error(NXT_LOG_ERR, thr->log,
434 "upstream sent in FastCGI stderr: \"%*s\"",
435 size, b->mem.pos);
436 nxt_log(task, NXT_LOG_ERR,
437 "upstream sent in FastCGI stderr: \"%*s\"",
438 size, b->mem.pos);
436 }
437
439 }
440
438 b->completion_handler(thr, b, b->parent);
441 b->completion_handler(task, b, b->parent);
439 }
440
441 /* Process FastCGI stdout output. */
442
443 if (fsr->parse.out[0] != NULL) {
442 }
443
444 /* Process FastCGI stdout output. */
445
446 if (fsr->parse.out[0] != NULL) {
444 nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next,
445 fsr->parse.out[0]);
447 nxt_source_filter(task->thread, fs->upstream->work_queue, task,
448 &fsr->next, fsr->parse.out[0]);
446 }
447}
448
449
450static void
449 }
450}
451
452
453static void
451nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data)
454nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
452{
453 nxt_fastcgi_source_t *fs;
454
455 fs = obj;
456
455{
456 nxt_fastcgi_source_t *fs;
457
458 fs = obj;
459
457 nxt_fastcgi_source_fail(fs);
460 nxt_fastcgi_source_fail(task, fs);
458}
459
460
461static void
461}
462
463
464static void
462nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
465nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
463{
464 nxt_int_t ret;
465 nxt_buf_t *b;
466 nxt_fastcgi_source_t *fs;
467
468 fs = obj;
469 b = data;
470
471 do {
466{
467 nxt_int_t ret;
468 nxt_buf_t *b;
469 nxt_fastcgi_source_t *fs;
470
471 fs = obj;
472 b = data;
473
474 do {
472 nxt_log_debug(thr->log, "fastcgi source header filter");
475 nxt_debug(task, "fastcgi source header filter");
473
474 if (nxt_slow_path(nxt_buf_is_sync(b))) {
476
477 if (nxt_slow_path(nxt_buf_is_sync(b))) {
475 nxt_fastcgi_source_sync_buffer(thr, fs, b);
478 nxt_fastcgi_source_sync_buffer(task, fs, b);
476 return;
477 }
478
479 for ( ;; ) {
480 ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
481
482 if (nxt_slow_path(ret != NXT_OK)) {
483 break;
484 }
485
479 return;
480 }
481
482 for ( ;; ) {
483 ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
484
485 if (nxt_slow_path(ret != NXT_OK)) {
486 break;
487 }
488
486 ret = nxt_fastcgi_source_header_process(fs);
489 ret = nxt_fastcgi_source_header_process(task, fs);
487
488 if (nxt_slow_path(ret != NXT_OK)) {
489 break;
490 }
491 }
492
493 if (nxt_fast_path(ret == NXT_DONE)) {
490
491 if (nxt_slow_path(ret != NXT_OK)) {
492 break;
493 }
494 }
495
496 if (nxt_fast_path(ret == NXT_DONE)) {
494 nxt_log_debug(thr->log, "fastcgi source header done");
497 nxt_debug(task, "fastcgi source header done");
495 nxt_fastcgi_source_header_ready(fs, b);
496 return;
497 }
498
499 if (nxt_fast_path(ret != NXT_AGAIN)) {
500
501 if (ret != NXT_ERROR) {
502 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
498 nxt_fastcgi_source_header_ready(fs, b);
499 return;
500 }
501
502 if (nxt_fast_path(ret != NXT_AGAIN)) {
503
504 if (ret != NXT_ERROR) {
505 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
503 nxt_log_error(NXT_LOG_ERR, thr->log,
504 "upstream sent invalid header line: \"%*s\\r...\"",
505 fs->u.header.parse.header_end
506 - fs->u.header.parse.header_name_start,
507 fs->u.header.parse.header_name_start);
506 nxt_log(task, NXT_LOG_ERR,
507 "upstream sent invalid header line: \"%*s\\r...\"",
508 fs->u.header.parse.header_end
509 - fs->u.header.parse.header_name_start,
510 fs->u.header.parse.header_name_start);
508 }
509
510 /* ret == NXT_ERROR */
511
511 }
512
513 /* ret == NXT_ERROR */
514
512 nxt_fastcgi_source_fail(fs);
515 nxt_fastcgi_source_fail(task, fs);
513 return;
514 }
515
516 b = b->next;
517
518 } while (b != NULL);
519}
520
521
522static void
516 return;
517 }
518
519 b = b->next;
520
521 } while (b != NULL);
522}
523
524
525static void
523nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs,
526nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
524 nxt_buf_t *b)
525{
526 if (nxt_buf_is_last(b)) {
527 nxt_buf_t *b)
528{
529 if (nxt_buf_is_last(b)) {
527 nxt_log_error(NXT_LOG_ERR, thr->log,
528 "upstream closed prematurely connection");
530 nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
529
530 } else {
531
532 } else {
531 nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
532 "enough to process upstream response header",
533 fs->upstream->buffers.max,
534 fs->upstream->buffers.size);
533 nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
534 "enough to process upstream response header",
535 fs->upstream->buffers.max, fs->upstream->buffers.size);
535 }
536
537 /* The stream source sends only the last and the nobuf sync buffer. */
538
536 }
537
538 /* The stream source sends only the last and the nobuf sync buffer. */
539
539 nxt_fastcgi_source_fail(fs);
540 nxt_fastcgi_source_fail(task, fs);
540}
541
542
543static nxt_int_t
541}
542
543
544static nxt_int_t
544nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
545nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
545{
546 size_t len;
546{
547 size_t len;
547 nxt_thread_t *thr;
548 nxt_name_value_t *nv;
549 nxt_lvlhsh_query_t lhq;
550 nxt_http_header_parse_t *hp;
551 nxt_upstream_name_value_t *unv;
552
548 nxt_name_value_t *nv;
549 nxt_lvlhsh_query_t lhq;
550 nxt_http_header_parse_t *hp;
551 nxt_upstream_name_value_t *unv;
552
553 thr = nxt_thread();
554 hp = &fs->u.header.parse;
555
556 len = hp->header_name_end - hp->header_name_start;
557
558 if (len > 255) {
553 hp = &fs->u.header.parse;
554
555 len = hp->header_name_end - hp->header_name_start;
556
557 if (len > 255) {
559 nxt_log_error(NXT_LOG_INFO, thr->log,
560 "upstream sent too long header field name: \"%*s\"",
561 len, hp->header_name_start);
558 nxt_log(task, NXT_LOG_INFO,
559 "upstream sent too long header field name: \"%*s\"",
560 len, hp->header_name_start);
562 return NXT_ERROR;
563 }
564
565 nv = nxt_list_add(fs->header_in.list);
566 if (nxt_slow_path(nv == NULL)) {
567 return NXT_ERROR;
568 }
569
570 nv->hash = hp->header_hash;
571 nv->skip = 0;
572 nv->name_len = len;
573 nv->name_start = hp->header_name_start;
574 nv->value_len = hp->header_end - hp->header_start;
575 nv->value_start = hp->header_start;
576
561 return NXT_ERROR;
562 }
563
564 nv = nxt_list_add(fs->header_in.list);
565 if (nxt_slow_path(nv == NULL)) {
566 return NXT_ERROR;
567 }
568
569 nv->hash = hp->header_hash;
570 nv->skip = 0;
571 nv->name_len = len;
572 nv->name_start = hp->header_name_start;
573 nv->value_len = hp->header_end - hp->header_start;
574 nv->value_start = hp->header_start;
575
577 nxt_log_debug(thr->log, "http header: \"%*s: %*s\"",
578 nv->name_len, nv->name_start, nv->value_len, nv->value_start);
576 nxt_debug(task, "http header: \"%*s: %*s\"",
577 nv->name_len, nv->name_start, nv->value_len, nv->value_start);
579
580 lhq.key_hash = nv->hash;
581 lhq.key.len = nv->name_len;
582 lhq.key.data = nv->name_start;
583 lhq.proto = &nxt_upstream_header_hash_proto;
584
585 if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
586 unv = lhq.value;

--- 90 unchanged lines hidden (view full) ---

677
678
679/*
680 * The FastCGI source body filter accumulates first body buffers before the next
681 * filter will be established and sets completion handler for the last buffer.
682 */
683
684static void
578
579 lhq.key_hash = nv->hash;
580 lhq.key.len = nv->name_len;
581 lhq.key.data = nv->name_start;
582 lhq.proto = &nxt_upstream_header_hash_proto;
583
584 if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
585 unv = lhq.value;

--- 90 unchanged lines hidden (view full) ---

676
677
678/*
679 * The FastCGI source body filter accumulates first body buffers before the next
680 * filter will be established and sets completion handler for the last buffer.
681 */
682
683static void
685nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
684nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
686{
687 nxt_buf_t *b, *in;
688 nxt_fastcgi_source_t *fs;
689
690 fs = obj;
691 in = data;
692
685{
686 nxt_buf_t *b, *in;
687 nxt_fastcgi_source_t *fs;
688
689 fs = obj;
690 in = data;
691
693 nxt_log_debug(thr->log, "fastcgi source body filter");
692 nxt_debug(task, "fastcgi source body filter");
694
695 for (b = in; b != NULL; b = b->next) {
696
697 if (nxt_buf_is_last(b)) {
698 b->data = fs->upstream->data;
699 b->completion_handler = fs->upstream->state->completion_handler;
700 }
701 }
702
703 if (fs->next != NULL) {
693
694 for (b = in; b != NULL; b = b->next) {
695
696 if (nxt_buf_is_last(b)) {
697 b->data = fs->upstream->data;
698 b->completion_handler = fs->upstream->state->completion_handler;
699 }
700 }
701
702 if (fs->next != NULL) {
704 nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in);
703 nxt_source_filter(task->thread, fs->upstream->work_queue, task,
704 fs->next, in);
705 return;
706 }
707
708 nxt_buf_chain_add(&fs->rest, in);
709}
710
711
712static nxt_buf_t *

--- 11 unchanged lines hidden (view full) ---

724 b->completion_handler = fs->upstream->state->completion_handler;
725 }
726
727 return b;
728}
729
730
731static void
705 return;
706 }
707
708 nxt_buf_chain_add(&fs->rest, in);
709}
710
711
712static nxt_buf_t *

--- 11 unchanged lines hidden (view full) ---

724 b->completion_handler = fs->upstream->state->completion_handler;
725 }
726
727 return b;
728}
729
730
731static void
732nxt_fastcgi_source_error(nxt_stream_source_t *stream)
732nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
733{
734 nxt_fastcgi_source_t *fs;
735
736 nxt_thread_log_debug("fastcgi source error");
737
738 fs = stream->upstream->protocol_source;
739
733{
734 nxt_fastcgi_source_t *fs;
735
736 nxt_thread_log_debug("fastcgi source error");
737
738 fs = stream->upstream->protocol_source;
739
740 nxt_fastcgi_source_fail(fs);
740 nxt_fastcgi_source_fail(task, fs);
741}
742
743
744static void
741}
742
743
744static void
745nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs)
745nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
746{
746{
747 nxt_thread_t *thr;
747 nxt_debug(task, "fastcgi source fail");
748
748
749 thr = nxt_thread();
750
751 nxt_log_debug(thr->log, "fastcgi source fail");
752
753 /* TODO: fail, next upstream, or bad gateway */
754
749 /* TODO: fail, next upstream, or bad gateway */
750
755 fs->upstream->state->error_handler(thr, fs, NULL);
751 fs->upstream->state->error_handler(task, fs, NULL);
756}
752}