xref: /unit/src/nxt_fastcgi_source.c (revision 2084:7d479274f334)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_FASTCGI_RESPONDER  1
11 #define NXT_FASTCGI_KEEP_CONN  1
12 
13 
14 typedef struct {
15     u_char    *buf;
16     uint32_t  len;
17     u_char    length[4];
18 } nxt_fastcgi_param_t;
19 
20 
21 #define nxt_fastcgi_set_record_length(p, length)                              \
22     do {                                                                      \
23         uint32_t  len = length;                                               \
24                                                                               \
25         p[1] = (u_char) len;  len >>= 8;                                      \
26         p[0] = (u_char) len;                                                  \
27     } while (0)
28 
29 
30 nxt_inline size_t
nxt_fastcgi_param_length(u_char * p,uint32_t length)31 nxt_fastcgi_param_length(u_char *p, uint32_t length)
32 {
33     if (nxt_fast_path(length < 128)) {
34         *p = (u_char) length;
35         return 1;
36     }
37 
38     p[3] = (u_char) length;  length >>= 8;
39     p[2] = (u_char) length;  length >>= 8;
40     p[1] = (u_char) length;  length >>= 8;
41     p[0] = (u_char) (length | 0x80);
42 
43     return 4;
44 }
45 
46 
47 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
48 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
49     nxt_fastcgi_param_t *param);
50 
51 static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
52     void *data);
53 static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
54     void *data);
55 static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
56     void *data);
57 static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
58     nxt_fastcgi_source_t *fs, nxt_buf_t *b);
59 
60 static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
61     nxt_fastcgi_source_t *fs);
62 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
63     nxt_name_value_t *nv);
64 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
65     nxt_name_value_t *nv);
66 
67 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
68     nxt_buf_t *b);
69 static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
70     void *data);
71 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
72 static void nxt_fastcgi_source_error(nxt_task_t *task,
73     nxt_stream_source_t *stream);
74 static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
75 
76 
77 /*
78  * A FastCGI request:
79  *   FCGI_BEGIN_REQUEST record;
80  *   Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
81  *   zero content length,
82  *   Several FCGI_STDIN records, the last FCGI_STDIN record must have
83  *   zero content length.
84  */
85 
86 static const uint8_t  nxt_fastcgi_begin_request[] = {
87     1,                                 /* FastCGI version.                   */
88     NXT_FASTCGI_BEGIN_REQUEST,         /* The BEGIN_REQUEST record type.     */
89     0, 1,                              /* Request ID.                        */
90     0, 8,                              /* Content length of the Role record. */
91     0,                                 /* Padding length.                    */
92     0,                                 /* Reserved.                          */
93 
94     0, NXT_FASTCGI_RESPONDER,          /* The Responder Role.                */
95     0,                                 /* Flags.                             */
96     0, 0, 0, 0, 0,                     /* Reserved.                          */
97 };
98 
99 
100 static const uint8_t  nxt_fastcgi_params_record[] = {
101     1,                                 /* FastCGI version.                   */
102     NXT_FASTCGI_PARAMS,                /* The PARAMS record type.            */
103     0, 1,                              /* Request ID.                        */
104     0, 0,                              /* Content length.                    */
105     0,                                 /* Padding length.                    */
106     0,                                 /* Reserved.                          */
107 };
108 
109 
110 static const uint8_t  nxt_fastcgi_stdin_record[] = {
111     1,                                 /* FastCGI version.                   */
112     NXT_FASTCGI_STDIN,                 /* The STDIN record type.             */
113     0, 1,                              /* Request ID.                        */
114     0, 0,                              /* Content length.                    */
115     0,                                 /* Padding length.                    */
116     0,                                 /* Reserved.                          */
117 };
118 
119 
120 void
nxt_fastcgi_source_handler(nxt_task_t * task,nxt_upstream_source_t * us,nxt_fastcgi_source_request_create_t request_create)121 nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
122     nxt_fastcgi_source_request_create_t request_create)
123 {
124     nxt_stream_source_t   *stream;
125     nxt_fastcgi_source_t  *fs;
126 
127     fs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
128     if (nxt_slow_path(fs == NULL)) {
129         goto fail;
130     }
131 
132     us->protocol_source = fs;
133 
134     fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
135                                          sizeof(nxt_name_value_t));
136     if (nxt_slow_path(fs->header_in.list == NULL)) {
137         goto fail;
138     }
139 
140     fs->header_in.hash = us->header_hash;
141     fs->upstream = us;
142     fs->request_create = request_create;
143 
144     stream = us->stream;
145 
146     if (stream == NULL) {
147         stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t));
148         if (nxt_slow_path(stream == NULL)) {
149             goto fail;
150         }
151 
152         us->stream = stream;
153         stream->upstream = us;
154 
155     } else {
156         nxt_memzero(stream, sizeof(nxt_stream_source_t));
157     }
158 
159     /*
160      * Create the FastCGI source filter chain:
161      *   stream source | FastCGI record filter | FastCGI HTTP header filter
162      */
163     stream->next = &fs->query;
164     stream->error_handler = nxt_fastcgi_source_error;
165 
166     fs->record.next.context = fs;
167     fs->record.next.filter = nxt_fastcgi_source_header_filter;
168 
169     fs->record.parse.last_buf = nxt_fastcgi_source_last_buf;
170     fs->record.parse.data = fs;
171     fs->record.parse.mem_pool = us->buffers.mem_pool;
172 
173     fs->query.context = &fs->record.parse;
174     fs->query.filter = nxt_fastcgi_source_record_filter;
175 
176     fs->header_in.content_length = -1;
177 
178     stream->out = nxt_fastcgi_request_create(fs);
179 
180     if (nxt_fast_path(stream->out != NULL)) {
181         nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
182         fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
183 
184         nxt_stream_source_connect(task, stream);
185         return;
186     }
187 
188 fail:
189 
190     nxt_fastcgi_source_fail(task, fs);
191 }
192 
193 
194 static nxt_buf_t *
nxt_fastcgi_request_create(nxt_fastcgi_source_t * fs)195 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
196 {
197     u_char               *p, *record_length;
198     size_t               len, size, max_record_size;
199     nxt_int_t            ret;
200     nxt_buf_t            *b, *req, **prev;
201     nxt_bool_t           begin_request;
202     nxt_fastcgi_param_t  param;
203 
204     nxt_thread_log_debug("fastcgi request");
205 
206     begin_request = 1;
207     param.len = 0;
208     prev = &req;
209 
210 new_buffer:
211 
212     ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0);
213     if (nxt_slow_path(ret != NXT_OK)) {
214         return NULL;
215     }
216 
217     b = fs->upstream->buffers.current;
218     fs->upstream->buffers.current = NULL;
219 
220     *prev = b;
221     prev = &b->next;
222 
223 new_record:
224 
225     size = b->mem.end - b->mem.free;
226     size = nxt_align_size(size, 8) - 8;
227     /* The maximal FastCGI record content size is 65535.  65528 is 64K - 8. */
228     max_record_size = nxt_min(65528, size);
229 
230     p = b->mem.free;
231 
232     if (begin_request) {
233         /* TODO: fastcgi keep conn in flags. */
234         p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16);
235         max_record_size -= 16;
236         begin_request = 0;
237     }
238 
239     b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8);
240     record_length = &p[4];
241     size = 0;
242 
243     for ( ;; ) {
244         if (param.len == 0) {
245             ret = nxt_fastcgi_next_param(fs, &param);
246 
247             if (nxt_slow_path(ret != NXT_OK)) {
248 
249                 if (nxt_slow_path(ret == NXT_ERROR)) {
250                     return NULL;
251                 }
252 
253                 /* ret == NXT_DONE */
254                 break;
255             }
256         }
257 
258         len = max_record_size;
259 
260         if (nxt_fast_path(len >= param.len)) {
261             len = param.len;
262             param.len = 0;
263 
264         } else {
265             param.len -= len;
266         }
267 
268         nxt_thread_log_debug("fastcgi copy len:%uz", len);
269 
270         b->mem.free = nxt_cpymem(b->mem.free, param.buf, len);
271 
272         size += len;
273         max_record_size -= len;
274 
275         if (nxt_slow_path(param.len != 0)) {
276             /* The record is full. */
277 
278             param.buf += len;
279 
280             nxt_thread_log_debug("fastcgi content size:%uz", size);
281 
282             nxt_fastcgi_set_record_length(record_length, size);
283 
284             /* The minimal size of aligned record with content is 16 bytes. */
285             if (b->mem.end - b->mem.free >= 16) {
286                 goto new_record;
287             }
288 
289             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
290                                  b->mem.pos);
291             goto new_buffer;
292         }
293     }
294 
295     nxt_thread_log_debug("fastcgi content size:%uz", size);
296 
297     nxt_fastcgi_set_record_length(record_length, size);
298 
299     /* A padding length. */
300     size = 8 - size % 8;
301     record_length[2] = (u_char) size;
302     nxt_memzero(b->mem.free, size);
303     b->mem.free += size;
304 
305     nxt_thread_log_debug("fastcgi padding:%uz", size);
306 
307     if (b->mem.end - b->mem.free < 16) {
308         nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
309 
310         b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0);
311         if (nxt_slow_path(b == NULL)) {
312             return NULL;
313         }
314 
315         *prev = b;
316         prev = &b->next;
317     }
318 
319     /* The end of FastCGI params. */
320     p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8);
321 
322     /* The end of FastCGI stdin. */
323     b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8);
324 
325     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
326 
327     return req;
328 }
329 
330 
331 static nxt_int_t
nxt_fastcgi_next_param(nxt_fastcgi_source_t * fs,nxt_fastcgi_param_t * param)332 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
333 {
334     nxt_int_t  ret;
335 
336     enum {
337         sw_name_length = 0,
338         sw_value_length,
339         sw_name,
340         sw_value,
341     };
342 
343     switch (fs->state) {
344 
345     case sw_name_length:
346         ret = fs->request_create(fs);
347 
348         if (nxt_slow_path(ret != NXT_OK)) {
349             return ret;
350         }
351 
352         nxt_thread_log_debug("fastcgi param \"%V: %V\"",
353                              &fs->u.request.name, &fs->u.request.value);
354 
355         fs->state = sw_value_length;
356         param->buf = param->length;
357         param->len = nxt_fastcgi_param_length(param->length,
358                                               fs->u.request.name.len);
359         break;
360 
361     case sw_value_length:
362         fs->state = sw_name;
363         param->buf = param->length;
364         param->len = nxt_fastcgi_param_length(param->length,
365                                               fs->u.request.value.len);
366         break;
367 
368     case sw_name:
369         fs->state = sw_value;
370         param->buf = fs->u.request.name.data;
371         param->len = fs->u.request.name.len;
372         break;
373 
374     case sw_value:
375         fs->state = sw_name_length;
376         param->buf = fs->u.request.value.data;
377         param->len = fs->u.request.value.len;
378         break;
379     }
380 
381     return NXT_OK;
382 }
383 
384 
385 static void
nxt_fastcgi_source_record_filter(nxt_task_t * task,void * obj,void * data)386 nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
387 {
388     size_t                       size;
389     u_char                       *p;
390     nxt_buf_t                    *b, *in;
391     nxt_fastcgi_source_t         *fs;
392     nxt_fastcgi_source_record_t  *fsr;
393 
394     fsr = obj;
395     in = data;
396 
397     nxt_debug(task, "fastcgi source record filter");
398 
399     if (nxt_slow_path(fsr->parse.done)) {
400         return;
401     }
402 
403     nxt_fastcgi_record_parse(task, &fsr->parse, in);
404 
405     fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
406 
407     if (fsr->parse.error) {
408         nxt_fastcgi_source_fail(task, fs);
409         return;
410     }
411 
412     if (fsr->parse.fastcgi_error) {
413         /*
414          * Output all parsed before a FastCGI record error and close upstream.
415          */
416         nxt_thread_current_work_queue_add(task->thread,
417                                           nxt_fastcgi_source_record_error,
418                                           task, fs, NULL);
419     }
420 
421     /* Log FastCGI stderr output. */
422 
423     for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
424 
425         for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
426             if (*p != '\r' && *p != '\n') {
427                 break;
428             }
429         }
430 
431         size = (p + 1) - b->mem.pos;
432 
433         if (size != 0) {
434             nxt_log(task, NXT_LOG_ERR,
435                     "upstream sent in FastCGI stderr: \"%*s\"",
436                     size, b->mem.pos);
437         }
438 
439         b->completion_handler(task, b, b->parent);
440     }
441 
442     /* Process FastCGI stdout output. */
443 
444     if (fsr->parse.out[0] != NULL) {
445         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
446                           &fsr->next, fsr->parse.out[0]);
447     }
448 }
449 
450 
451 static void
nxt_fastcgi_source_record_error(nxt_task_t * task,void * obj,void * data)452 nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
453 {
454     nxt_fastcgi_source_t  *fs;
455 
456     fs = obj;
457 
458     nxt_fastcgi_source_fail(task, fs);
459 }
460 
461 
462 static void
nxt_fastcgi_source_header_filter(nxt_task_t * task,void * obj,void * data)463 nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
464 {
465     nxt_int_t             ret;
466     nxt_buf_t             *b;
467     nxt_fastcgi_source_t  *fs;
468 
469     fs = obj;
470     b = data;
471 
472     do {
473         nxt_debug(task, "fastcgi source header filter");
474 
475         if (nxt_slow_path(nxt_buf_is_sync(b))) {
476             nxt_fastcgi_source_sync_buffer(task, fs, b);
477             return;
478         }
479 
480         for ( ;; ) {
481             ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
482 
483             if (nxt_slow_path(ret != NXT_OK)) {
484                 break;
485             }
486 
487             ret = nxt_fastcgi_source_header_process(task, fs);
488 
489             if (nxt_slow_path(ret != NXT_OK)) {
490                 break;
491             }
492         }
493 
494         if (nxt_fast_path(ret == NXT_DONE)) {
495             nxt_debug(task, "fastcgi source header done");
496             nxt_fastcgi_source_header_ready(fs, b);
497             return;
498         }
499 
500         if (nxt_fast_path(ret != NXT_AGAIN)) {
501 
502             if (ret != NXT_ERROR) {
503                 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
504                 nxt_log(task, NXT_LOG_ERR,
505                         "upstream sent invalid header line: \"%*s\\r...\"",
506                         fs->u.header.parse.header_end
507                             - fs->u.header.parse.header_name_start,
508                         fs->u.header.parse.header_name_start);
509             }
510 
511             /* ret == NXT_ERROR */
512 
513             nxt_fastcgi_source_fail(task, fs);
514             return;
515         }
516 
517         b = b->next;
518 
519     } while (b != NULL);
520 }
521 
522 
523 static void
nxt_fastcgi_source_sync_buffer(nxt_task_t * task,nxt_fastcgi_source_t * fs,nxt_buf_t * b)524 nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
525     nxt_buf_t *b)
526 {
527     if (nxt_buf_is_last(b)) {
528         nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
529 
530     } else {
531         nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
532                 "enough to process upstream response header",
533                 fs->upstream->buffers.max, fs->upstream->buffers.size);
534     }
535 
536     /* The stream source sends only the last and the nobuf sync buffer. */
537 
538     nxt_fastcgi_source_fail(task, fs);
539 }
540 
541 
542 static nxt_int_t
nxt_fastcgi_source_header_process(nxt_task_t * task,nxt_fastcgi_source_t * fs)543 nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
544 {
545     size_t                     len;
546     nxt_name_value_t           *nv;
547     nxt_lvlhsh_query_t         lhq;
548     nxt_http_header_parse_t    *hp;
549     nxt_upstream_name_value_t  *unv;
550 
551     hp = &fs->u.header.parse;
552 
553     len = hp->header_name_end - hp->header_name_start;
554 
555     if (len > 255) {
556         nxt_log(task, NXT_LOG_INFO,
557                 "upstream sent too long header field name: \"%*s\"",
558                 len, hp->header_name_start);
559         return NXT_ERROR;
560     }
561 
562     nv = nxt_list_add(fs->header_in.list);
563     if (nxt_slow_path(nv == NULL)) {
564         return NXT_ERROR;
565     }
566 
567     nv->hash = hp->header_hash;
568     nv->skip = 0;
569     nv->name_len = len;
570     nv->name_start = hp->header_name_start;
571     nv->value_len = hp->header_end - hp->header_start;
572     nv->value_start = hp->header_start;
573 
574     nxt_debug(task, "http header: \"%*s: %*s\"",
575               nv->name_len, nv->name_start, nv->value_len, nv->value_start);
576 
577     lhq.key_hash = nv->hash;
578     lhq.key.len = nv->name_len;
579     lhq.key.data = nv->name_start;
580     lhq.proto = &nxt_upstream_header_hash_proto;
581 
582     if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
583         unv = lhq.value;
584 
585         if (unv->handler(fs->upstream, nv) == NXT_OK) {
586             return NXT_ERROR;
587         }
588     }
589 
590     return NXT_OK;
591 }
592 
593 
594 static const nxt_upstream_name_value_t  nxt_fastcgi_source_headers[]
595     nxt_aligned(32) =
596 {
597     { nxt_fastcgi_source_status,
598       nxt_upstream_name_value("status") },
599 
600     { nxt_fastcgi_source_content_length,
601       nxt_upstream_name_value("content-length") },
602 };
603 
604 
605 nxt_int_t
nxt_fastcgi_source_hash_create(nxt_mp_t * mp,nxt_lvlhsh_t * lh)606 nxt_fastcgi_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh)
607 {
608     return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers,
609                                         nxt_nitems(nxt_fastcgi_source_headers));
610 }
611 
612 
613 static nxt_int_t
nxt_fastcgi_source_status(nxt_upstream_source_t * us,nxt_name_value_t * nv)614 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv)
615 {
616     nxt_int_t             n;
617     nxt_str_t             s;
618     nxt_fastcgi_source_t  *fs;
619 
620     s.len = nv->value_len;
621     s.data = nv->value_start;
622 
623     n = nxt_str_int_parse(&s);
624 
625     if (nxt_fast_path(n > 0)) {
626         fs = us->protocol_source;
627         fs->header_in.status = n;
628         return NXT_OK;
629     }
630 
631     return NXT_ERROR;
632 }
633 
634 
635 static nxt_int_t
nxt_fastcgi_source_content_length(nxt_upstream_source_t * us,nxt_name_value_t * nv)636 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
637     nxt_name_value_t *nv)
638 {
639     nxt_off_t             length;
640     nxt_fastcgi_source_t  *fs;
641 
642     length = nxt_off_t_parse(nv->value_start, nv->value_len);
643 
644     if (nxt_fast_path(length > 0)) {
645         fs = us->protocol_source;
646         fs->header_in.content_length = length;
647         return NXT_OK;
648     }
649 
650     return NXT_ERROR;
651 }
652 
653 
654 static void
nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t * fs,nxt_buf_t * b)655 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
656 {
657     /*
658      * Change the FastCGI source filter chain:
659      *   stream source | FastCGI record filter | FastCGI body filter
660      */
661     fs->record.next.filter = nxt_fastcgi_source_body_filter;
662 
663     if (nxt_buf_mem_used_size(&b->mem) != 0) {
664         fs->rest = b;
665     }
666 
667     if (fs->header_in.status == 0) {
668         /* The "200 OK" status by default. */
669         fs->header_in.status = 200;
670     }
671 
672     fs->upstream->state->ready_handler(fs);
673 }
674 
675 
676 /*
677  * The FastCGI source body filter accumulates first body buffers before the next
678  * filter will be established and sets completion handler for the last buffer.
679  */
680 
681 static void
nxt_fastcgi_source_body_filter(nxt_task_t * task,void * obj,void * data)682 nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
683 {
684     nxt_buf_t             *b, *in;
685     nxt_fastcgi_source_t  *fs;
686 
687     fs = obj;
688     in = data;
689 
690     nxt_debug(task, "fastcgi source body filter");
691 
692     for (b = in; b != NULL; b = b->next) {
693 
694         if (nxt_buf_is_last(b)) {
695             b->data = fs->upstream->data;
696             b->completion_handler = fs->upstream->state->completion_handler;
697         }
698     }
699 
700     if (fs->next != NULL) {
701         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
702                           fs->next, in);
703         return;
704     }
705 
706     nxt_buf_chain_add(&fs->rest, in);
707 }
708 
709 
710 static nxt_buf_t *
nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t * fp)711 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
712 {
713     nxt_buf_t             *b;
714     nxt_fastcgi_source_t  *fs;
715 
716     fs = fp->data;
717 
718     b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST);
719 
720     if (nxt_fast_path(b != NULL)) {
721         b->data = fs->upstream->data;
722         b->completion_handler = fs->upstream->state->completion_handler;
723     }
724 
725     return b;
726 }
727 
728 
729 static void
nxt_fastcgi_source_error(nxt_task_t * task,nxt_stream_source_t * stream)730 nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
731 {
732     nxt_fastcgi_source_t  *fs;
733 
734     nxt_thread_log_debug("fastcgi source error");
735 
736     fs = stream->upstream->protocol_source;
737 
738     nxt_fastcgi_source_fail(task, fs);
739 }
740 
741 
742 static void
nxt_fastcgi_source_fail(nxt_task_t * task,nxt_fastcgi_source_t * fs)743 nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
744 {
745     nxt_debug(task, "fastcgi source fail");
746 
747     /* TODO: fail, next upstream, or bad gateway */
748 
749     fs->upstream->state->error_handler(task, fs, NULL);
750 }
751