xref: /unit/src/nxt_fastcgi_source.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_FASTCGI_RESPONDER  1
11 #define NXT_FASTCGI_KEEP_CONN  1
12 
13 
14 typedef struct {
15     u_char    *buf;
16     uint32_t  len;
17     u_char    length[4];
18 } nxt_fastcgi_param_t;
19 
20 
21 #define                                                                       \
22 nxt_fastcgi_set_record_length(p, length)                                      \
23     do {                                                                      \
24         uint32_t  len = length;                                               \
25                                                                               \
26         p[1] = (u_char) len;  len >>= 8;                                      \
27         p[0] = (u_char) len;                                                  \
28     } while (0)
29 
30 
31 nxt_inline size_t
32 nxt_fastcgi_param_length(u_char *p, uint32_t length)
33 {
34     if (nxt_fast_path(length < 128)) {
35         *p = (u_char) length;
36         return 1;
37     }
38 
39     p[3] = (u_char) length;  length >>= 8;
40     p[2] = (u_char) length;  length >>= 8;
41     p[1] = (u_char) length;  length >>= 8;
42     p[0] = (u_char) (length | 0x80);
43 
44     return 4;
45 }
46 
47 
48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
50     nxt_fastcgi_param_t *param);
51 
52 static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj,
53     void *data);
54 static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj,
55     void *data);
56 static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj,
57     void *data);
58 static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr,
59     nxt_fastcgi_source_t *fs, nxt_buf_t *b);
60 
61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs);
62 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
63     nxt_name_value_t *nv);
64 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
65     nxt_name_value_t *nv);
66 
67 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
68     nxt_buf_t *b);
69 static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj,
70     void *data);
71 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
72 static void nxt_fastcgi_source_error(nxt_stream_source_t *stream);
73 static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs);
74 
75 
76 /*
77  * A FastCGI request:
78  *   FCGI_BEGIN_REQUEST record;
79  *   Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
80  *   zero content length,
81  *   Several FCGI_STDIN records, the last FCGI_STDIN record must have
82  *   zero content length.
83  */
84 
85 static const uint8_t  nxt_fastcgi_begin_request[] = {
86     1,                                 /* FastCGI version.                   */
87     NXT_FASTCGI_BEGIN_REQUEST,         /* The BEGIN_REQUEST record type.     */
88     0, 1,                              /* Request ID.                        */
89     0, 8,                              /* Content length of the Role record. */
90     0,                                 /* Padding length.                    */
91     0,                                 /* Reserved.                          */
92 
93     0, NXT_FASTCGI_RESPONDER,          /* The Responder Role.                */
94     0,                                 /* Flags.                             */
95     0, 0, 0, 0, 0,                     /* Reserved.                          */
96 };
97 
98 
99 static const uint8_t  nxt_fastcgi_params_record[] = {
100     1,                                 /* FastCGI version.                   */
101     NXT_FASTCGI_PARAMS,                /* The PARAMS record type.            */
102     0, 1,                              /* Request ID.                        */
103     0, 0,                              /* Content length.                    */
104     0,                                 /* Padding length.                    */
105     0,                                 /* Reserved.                          */
106 };
107 
108 
109 static const uint8_t  nxt_fastcgi_stdin_record[] = {
110     1,                                 /* FastCGI version.                   */
111     NXT_FASTCGI_STDIN,                 /* The STDIN record type.             */
112     0, 1,                              /* Request ID.                        */
113     0, 0,                              /* Content length.                    */
114     0,                                 /* Padding length.                    */
115     0,                                 /* Reserved.                          */
116 };
117 
118 
119 void
120 nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
121     nxt_fastcgi_source_request_create_t request_create)
122 {
123     nxt_stream_source_t   *stream;
124     nxt_fastcgi_source_t  *fs;
125 
126     fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
127     if (nxt_slow_path(fs == NULL)) {
128         goto fail;
129     }
130 
131     us->protocol_source = fs;
132 
133     fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
134                                          sizeof(nxt_name_value_t));
135     if (nxt_slow_path(fs->header_in.list == NULL)) {
136         goto fail;
137     }
138 
139     fs->header_in.hash = us->header_hash;
140     fs->upstream = us;
141     fs->request_create = request_create;
142 
143     stream = us->stream;
144 
145     if (stream == NULL) {
146         stream = nxt_mem_zalloc(us->buffers.mem_pool,
147                                 sizeof(nxt_stream_source_t));
148         if (nxt_slow_path(stream == NULL)) {
149             goto fail;
150         }
151 
152         us->stream = stream;
153         stream->upstream = us;
154 
155     } else {
156         nxt_memzero(stream, sizeof(nxt_stream_source_t));
157     }
158 
159     /*
160      * Create the FastCGI source filter chain:
161      *   stream source | FastCGI record filter | FastCGI HTTP header filter
162      */
163     stream->next = &fs->query;
164     stream->error_handler = nxt_fastcgi_source_error;
165 
166     fs->record.next.context = fs;
167     fs->record.next.filter = nxt_fastcgi_source_header_filter;
168 
169     fs->record.parse.last_buf = nxt_fastcgi_source_last_buf;
170     fs->record.parse.data = fs;
171     fs->record.parse.mem_pool = us->buffers.mem_pool;
172 
173     fs->query.context = &fs->record.parse;
174     fs->query.filter = nxt_fastcgi_source_record_filter;
175 
176     fs->header_in.content_length = -1;
177 
178     stream->out = nxt_fastcgi_request_create(fs);
179 
180     if (nxt_fast_path(stream->out != NULL)) {
181         nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
182         fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
183 
184         nxt_stream_source_connect(stream);
185         return;
186     }
187 
188 fail:
189 
190     nxt_fastcgi_source_fail(fs);
191 }
192 
193 
194 static nxt_buf_t *
195 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
196 {
197     u_char               *p, *record_length;
198     size_t               len, size, max_record_size;
199     nxt_int_t            ret;
200     nxt_buf_t            *b, *req, **prev;
201     nxt_bool_t           begin_request;
202     nxt_fastcgi_param_t  param;
203 
204     nxt_thread_log_debug("fastcgi request");
205 
206     begin_request = 1;
207     param.len = 0;
208     prev = &req;
209 
210 new_buffer:
211 
212     ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0);
213     if (nxt_slow_path(ret != NXT_OK)) {
214         return NULL;
215     }
216 
217     b = fs->upstream->buffers.current;
218     fs->upstream->buffers.current = NULL;
219 
220     *prev = b;
221     prev = &b->next;
222 
223 new_record:
224 
225     size = b->mem.end - b->mem.free;
226     size = nxt_align_size(size, 8) - 8;
227     /* The maximal FastCGI record content size is 65535.  65528 is 64K - 8. */
228     max_record_size = nxt_min(65528, size);
229 
230     p = b->mem.free;
231 
232     if (begin_request) {
233         /* TODO: fastcgi keep conn in flags. */
234         p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16);
235         max_record_size -= 16;
236         begin_request = 0;
237     }
238 
239     b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8);
240     record_length = &p[4];
241     size = 0;
242 
243     for ( ;; ) {
244         if (param.len == 0) {
245             ret = nxt_fastcgi_next_param(fs, &param);
246 
247             if (nxt_slow_path(ret != NXT_OK)) {
248 
249                 if (nxt_slow_path(ret == NXT_ERROR)) {
250                     return NULL;
251                 }
252 
253                 /* ret == NXT_DONE */
254                 break;
255             }
256         }
257 
258         len = max_record_size;
259 
260         if (nxt_fast_path(len >= param.len)) {
261             len = param.len;
262             param.len = 0;
263 
264         } else {
265             param.len -= len;
266         }
267 
268         nxt_thread_log_debug("fastcgi copy len:%uz", len);
269 
270         b->mem.free = nxt_cpymem(b->mem.free, param.buf, len);
271 
272         size += len;
273         max_record_size -= len;
274 
275         if (nxt_slow_path(param.len != 0)) {
276             /* The record is full. */
277 
278             param.buf += len;
279 
280             nxt_thread_log_debug("fastcgi content size:%uz", size);
281 
282             nxt_fastcgi_set_record_length(record_length, size);
283 
284             /* The minimal size of aligned record with content is 16 bytes. */
285             if (b->mem.end - b->mem.free >= 16) {
286                 goto new_record;
287             }
288 
289             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
290                                  b->mem.pos);
291             goto new_buffer;
292         }
293     }
294 
295     nxt_thread_log_debug("fastcgi content size:%uz", size);
296 
297     nxt_fastcgi_set_record_length(record_length, size);
298 
299     /* A padding length. */
300     size = 8 - size % 8;
301     record_length[2] = (u_char) size;
302     nxt_memzero(b->mem.free, size);
303     b->mem.free += size;
304 
305     nxt_thread_log_debug("fastcgi padding:%uz", size);
306 
307     if (b->mem.end - b->mem.free < 16) {
308         nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
309 
310         b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0);
311         if (nxt_slow_path(b == NULL)) {
312             return NULL;
313         }
314 
315         *prev = b;
316         prev = &b->next;
317     }
318 
319     /* The end of FastCGI params. */
320     p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8);
321 
322     /* The end of FastCGI stdin. */
323     b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8);
324 
325     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
326 
327     return req;
328 }
329 
330 
331 static nxt_int_t
332 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
333 {
334     nxt_int_t  ret;
335 
336     enum {
337          sw_name_length = 0,
338          sw_value_length,
339          sw_name,
340          sw_value,
341     };
342 
343     switch (fs->state) {
344 
345     case sw_name_length:
346         ret = fs->request_create(fs);
347 
348         if (nxt_slow_path(ret != NXT_OK)) {
349             return ret;
350         }
351 
352         nxt_thread_log_debug("fastcgi param \"%V: %V\"",
353                              &fs->u.request.name, &fs->u.request.value);
354 
355         fs->state = sw_value_length;
356         param->buf = param->length;
357         param->len = nxt_fastcgi_param_length(param->length,
358                                               fs->u.request.name.len);
359         break;
360 
361     case sw_value_length:
362         fs->state = sw_name;
363         param->buf = param->length;
364         param->len = nxt_fastcgi_param_length(param->length,
365                                               fs->u.request.value.len);
366         break;
367 
368     case sw_name:
369         fs->state = sw_value;
370         param->buf = fs->u.request.name.data;
371         param->len = fs->u.request.name.len;
372         break;
373 
374     case sw_value:
375         fs->state = sw_name_length;
376         param->buf = fs->u.request.value.data;
377         param->len = fs->u.request.value.len;
378         break;
379     }
380 
381     return NXT_OK;
382 }
383 
384 
385 static void
386 nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
387 {
388     size_t                       size;
389     u_char                       *p;
390     nxt_buf_t                    *b, *in;
391     nxt_fastcgi_source_t         *fs;
392     nxt_fastcgi_source_record_t  *fsr;
393 
394     fsr = obj;
395     in = data;
396 
397     nxt_log_debug(thr->log, "fastcgi source record filter");
398 
399     if (nxt_slow_path(fsr->parse.done)) {
400         return;
401     }
402 
403     nxt_fastcgi_record_parse(&fsr->parse, in);
404 
405     fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
406 
407     if (fsr->parse.error) {
408         nxt_fastcgi_source_fail(fs);
409         return;
410     }
411 
412     if (fsr->parse.fastcgi_error) {
413         /*
414          * Output all parsed before a FastCGI record error and close upstream.
415          */
416         nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error,
417                                           fs, NULL, thr->log);
418     }
419 
420     /* Log FastCGI stderr output. */
421 
422     for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
423 
424         for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
425             if (*p != NXT_CR && *p != NXT_LF) {
426                 break;
427             }
428         }
429 
430         size = (p + 1) - b->mem.pos;
431 
432         if (size != 0) {
433             nxt_log_error(NXT_LOG_ERR, thr->log,
434                           "upstream sent in FastCGI stderr: \"%*s\"",
435                           size, b->mem.pos);
436         }
437 
438         b->completion_handler(thr, b, b->parent);
439     }
440 
441     /* Process FastCGI stdout output. */
442 
443     if (fsr->parse.out[0] != NULL) {
444         nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next,
445                           fsr->parse.out[0]);
446     }
447 }
448 
449 
450 static void
451 nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data)
452 {
453     nxt_fastcgi_source_t  *fs;
454 
455     fs = obj;
456 
457     nxt_fastcgi_source_fail(fs);
458 }
459 
460 
461 static void
462 nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
463 {
464     nxt_int_t             ret;
465     nxt_buf_t             *b;
466     nxt_fastcgi_source_t  *fs;
467 
468     fs = obj;
469     b = data;
470 
471     do {
472         nxt_log_debug(thr->log, "fastcgi source header filter");
473 
474         if (nxt_slow_path(nxt_buf_is_sync(b))) {
475             nxt_fastcgi_source_sync_buffer(thr, fs, b);
476             return;
477         }
478 
479         for ( ;; ) {
480             ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
481 
482             if (nxt_slow_path(ret != NXT_OK)) {
483                 break;
484             }
485 
486             ret = nxt_fastcgi_source_header_process(fs);
487 
488             if (nxt_slow_path(ret != NXT_OK)) {
489                 break;
490             }
491         }
492 
493         if (nxt_fast_path(ret == NXT_DONE)) {
494             nxt_log_debug(thr->log, "fastcgi source header done");
495             nxt_fastcgi_source_header_ready(fs, b);
496             return;
497         }
498 
499         if (nxt_fast_path(ret != NXT_AGAIN)) {
500 
501             if (ret != NXT_ERROR) {
502                 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
503                 nxt_log_error(NXT_LOG_ERR, thr->log,
504                            "upstream sent invalid header line: \"%*s\\r...\"",
505                            fs->u.header.parse.header_end
506                                - fs->u.header.parse.header_name_start,
507                            fs->u.header.parse.header_name_start);
508             }
509 
510             /* ret == NXT_ERROR */
511 
512             nxt_fastcgi_source_fail(fs);
513             return;
514         }
515 
516         b = b->next;
517 
518     } while (b != NULL);
519 }
520 
521 
522 static void
523 nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs,
524     nxt_buf_t *b)
525 {
526     if (nxt_buf_is_last(b)) {
527         nxt_log_error(NXT_LOG_ERR, thr->log,
528                       "upstream closed prematurely connection");
529 
530     } else {
531         nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
532                       "enough to process upstream response header",
533                       fs->upstream->buffers.max,
534                       fs->upstream->buffers.size);
535     }
536 
537     /* The stream source sends only the last and the nobuf sync buffer. */
538 
539     nxt_fastcgi_source_fail(fs);
540 }
541 
542 
543 static nxt_int_t
544 nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
545 {
546     size_t                     len;
547     nxt_thread_t               *thr;
548     nxt_name_value_t           *nv;
549     nxt_lvlhsh_query_t         lhq;
550     nxt_http_header_parse_t    *hp;
551     nxt_upstream_name_value_t  *unv;
552 
553     thr = nxt_thread();
554     hp = &fs->u.header.parse;
555 
556     len = hp->header_name_end - hp->header_name_start;
557 
558     if (len > 255) {
559         nxt_log_error(NXT_LOG_INFO, thr->log,
560                       "upstream sent too long header field name: \"%*s\"",
561                       len, hp->header_name_start);
562         return NXT_ERROR;
563     }
564 
565     nv = nxt_list_add(fs->header_in.list);
566     if (nxt_slow_path(nv == NULL)) {
567         return NXT_ERROR;
568     }
569 
570     nv->hash = hp->header_hash;
571     nv->skip = 0;
572     nv->name_len = len;
573     nv->name_start = hp->header_name_start;
574     nv->value_len = hp->header_end - hp->header_start;
575     nv->value_start = hp->header_start;
576 
577     nxt_log_debug(thr->log, "http header: \"%*s: %*s\"",
578                   nv->name_len, nv->name_start, nv->value_len, nv->value_start);
579 
580     lhq.key_hash = nv->hash;
581     lhq.key.len = nv->name_len;
582     lhq.key.data = nv->name_start;
583     lhq.proto = &nxt_upstream_header_hash_proto;
584 
585     if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
586         unv = lhq.value;
587 
588         if (unv->handler(fs->upstream, nv) == NXT_OK) {
589             return NXT_ERROR;
590         }
591     }
592 
593     return NXT_OK;
594 }
595 
596 
597 static const nxt_upstream_name_value_t  nxt_fastcgi_source_headers[]
598     nxt_aligned(32) =
599 {
600     { nxt_fastcgi_source_status,
601       nxt_upstream_name_value("status") },
602 
603     { nxt_fastcgi_source_content_length,
604       nxt_upstream_name_value("content-length") },
605 };
606 
607 
608 nxt_int_t
609 nxt_fastcgi_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
610 {
611     return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers,
612                                         nxt_nitems(nxt_fastcgi_source_headers));
613 }
614 
615 
616 static nxt_int_t
617 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv)
618 {
619     nxt_int_t             n;
620     nxt_str_t             s;
621     nxt_fastcgi_source_t  *fs;
622 
623     s.len = nv->value_len;
624     s.data = nv->value_start;
625 
626     n = nxt_str_int_parse(&s);
627 
628     if (nxt_fast_path(n > 0)) {
629         fs = us->protocol_source;
630         fs->header_in.status = n;
631         return NXT_OK;
632     }
633 
634     return NXT_ERROR;
635 }
636 
637 
638 static nxt_int_t
639 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
640     nxt_name_value_t *nv)
641 {
642     nxt_off_t             length;
643     nxt_fastcgi_source_t  *fs;
644 
645     length = nxt_off_t_parse(nv->value_start, nv->value_len);
646 
647     if (nxt_fast_path(length > 0)) {
648         fs = us->protocol_source;
649         fs->header_in.content_length = length;
650         return NXT_OK;
651     }
652 
653     return NXT_ERROR;
654 }
655 
656 
657 static void
658 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
659 {
660     /*
661      * Change the FastCGI source filter chain:
662      *   stream source | FastCGI record filter | FastCGI body filter
663      */
664     fs->record.next.filter = nxt_fastcgi_source_body_filter;
665 
666     if (nxt_buf_mem_used_size(&b->mem) != 0) {
667         fs->rest = b;
668     }
669 
670     if (fs->header_in.status == 0) {
671         /* The "200 OK" status by default. */
672         fs->header_in.status = 200;
673     }
674 
675     fs->upstream->state->ready_handler(fs);
676 }
677 
678 
679 /*
680  * The FastCGI source body filter accumulates first body buffers before the next
681  * filter will be established and sets completion handler for the last buffer.
682  */
683 
684 static void
685 nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
686 {
687     nxt_buf_t             *b, *in;
688     nxt_fastcgi_source_t  *fs;
689 
690     fs = obj;
691     in = data;
692 
693     nxt_log_debug(thr->log, "fastcgi source body filter");
694 
695     for (b = in; b != NULL; b = b->next) {
696 
697         if (nxt_buf_is_last(b)) {
698             b->data = fs->upstream->data;
699             b->completion_handler = fs->upstream->state->completion_handler;
700         }
701     }
702 
703     if (fs->next != NULL) {
704         nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in);
705         return;
706     }
707 
708     nxt_buf_chain_add(&fs->rest, in);
709 }
710 
711 
712 static nxt_buf_t *
713 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
714 {
715     nxt_buf_t             *b;
716     nxt_fastcgi_source_t  *fs;
717 
718     fs = fp->data;
719 
720     b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST);
721 
722     if (nxt_fast_path(b != NULL)) {
723         b->data = fs->upstream->data;
724         b->completion_handler = fs->upstream->state->completion_handler;
725     }
726 
727     return b;
728 }
729 
730 
731 static void
732 nxt_fastcgi_source_error(nxt_stream_source_t *stream)
733 {
734     nxt_fastcgi_source_t  *fs;
735 
736     nxt_thread_log_debug("fastcgi source error");
737 
738     fs = stream->upstream->protocol_source;
739 
740     nxt_fastcgi_source_fail(fs);
741 }
742 
743 
744 static void
745 nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs)
746 {
747     nxt_thread_t  *thr;
748 
749     thr = nxt_thread();
750 
751     nxt_log_debug(thr->log, "fastcgi source fail");
752 
753     /* TODO: fail, next upstream, or bad gateway */
754 
755     fs->upstream->state->error_handler(thr, fs, NULL);
756 }
757