xref: /unit/src/nxt_fastcgi_source.c (revision 1:fdc027c56872)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_FASTCGI_RESPONDER  1
11 #define NXT_FASTCGI_KEEP_CONN  1
12 
13 
14 typedef struct {
15     u_char    *buf;
16     uint32_t  len;
17     u_char    length[4];
18 } nxt_fastcgi_param_t;
19 
20 
21 #define                                                                       \
22 nxt_fastcgi_set_record_length(p, length)                                      \
23     do {                                                                      \
24         uint32_t  len = length;                                               \
25                                                                               \
26         p[1] = (u_char) len;  len >>= 8;                                      \
27         p[0] = (u_char) len;                                                  \
28     } while (0)
29 
30 
31 nxt_inline size_t
32 nxt_fastcgi_param_length(u_char *p, uint32_t length)
33 {
34     if (nxt_fast_path(length < 128)) {
35         *p = (u_char) length;
36         return 1;
37     }
38 
39     p[3] = (u_char) length;  length >>= 8;
40     p[2] = (u_char) length;  length >>= 8;
41     p[1] = (u_char) length;  length >>= 8;
42     p[0] = (u_char) (length | 0x80);
43 
44     return 4;
45 }
46 
47 
48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
50     nxt_fastcgi_param_t *param);
51 
52 static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
57     void *data);
58 static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
59     nxt_fastcgi_source_t *fs, nxt_buf_t *b);
60 
61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
62     nxt_fastcgi_source_t *fs);
63 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
64     nxt_name_value_t *nv);
65 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
66     nxt_name_value_t *nv);
67 
68 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
69     nxt_buf_t *b);
70 static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
71     void *data);
72 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
73 static void nxt_fastcgi_source_error(nxt_task_t *task,
74     nxt_stream_source_t *stream);
75 static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
76 
77 
78 /*
79  * A FastCGI request:
80  *   FCGI_BEGIN_REQUEST record;
81  *   Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
82  *   zero content length,
83  *   Several FCGI_STDIN records, the last FCGI_STDIN record must have
84  *   zero content length.
85  */
86 
87 static const uint8_t  nxt_fastcgi_begin_request[] = {
88     1,                                 /* FastCGI version.                   */
89     NXT_FASTCGI_BEGIN_REQUEST,         /* The BEGIN_REQUEST record type.     */
90     0, 1,                              /* Request ID.                        */
91     0, 8,                              /* Content length of the Role record. */
92     0,                                 /* Padding length.                    */
93     0,                                 /* Reserved.                          */
94 
95     0, NXT_FASTCGI_RESPONDER,          /* The Responder Role.                */
96     0,                                 /* Flags.                             */
97     0, 0, 0, 0, 0,                     /* Reserved.                          */
98 };
99 
100 
101 static const uint8_t  nxt_fastcgi_params_record[] = {
102     1,                                 /* FastCGI version.                   */
103     NXT_FASTCGI_PARAMS,                /* The PARAMS record type.            */
104     0, 1,                              /* Request ID.                        */
105     0, 0,                              /* Content length.                    */
106     0,                                 /* Padding length.                    */
107     0,                                 /* Reserved.                          */
108 };
109 
110 
111 static const uint8_t  nxt_fastcgi_stdin_record[] = {
112     1,                                 /* FastCGI version.                   */
113     NXT_FASTCGI_STDIN,                 /* The STDIN record type.             */
114     0, 1,                              /* Request ID.                        */
115     0, 0,                              /* Content length.                    */
116     0,                                 /* Padding length.                    */
117     0,                                 /* Reserved.                          */
118 };
119 
120 
121 void
122 nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
123     nxt_fastcgi_source_request_create_t request_create)
124 {
125     nxt_stream_source_t   *stream;
126     nxt_fastcgi_source_t  *fs;
127 
128     fs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
129     if (nxt_slow_path(fs == NULL)) {
130         goto fail;
131     }
132 
133     us->protocol_source = fs;
134 
135     fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
136                                          sizeof(nxt_name_value_t));
137     if (nxt_slow_path(fs->header_in.list == NULL)) {
138         goto fail;
139     }
140 
141     fs->header_in.hash = us->header_hash;
142     fs->upstream = us;
143     fs->request_create = request_create;
144 
145     stream = us->stream;
146 
147     if (stream == NULL) {
148         stream = nxt_mem_zalloc(us->buffers.mem_pool,
149                                 sizeof(nxt_stream_source_t));
150         if (nxt_slow_path(stream == NULL)) {
151             goto fail;
152         }
153 
154         us->stream = stream;
155         stream->upstream = us;
156 
157     } else {
158         nxt_memzero(stream, sizeof(nxt_stream_source_t));
159     }
160 
161     /*
162      * Create the FastCGI source filter chain:
163      *   stream source | FastCGI record filter | FastCGI HTTP header filter
164      */
165     stream->next = &fs->query;
166     stream->error_handler = nxt_fastcgi_source_error;
167 
168     fs->record.next.context = fs;
169     fs->record.next.filter = nxt_fastcgi_source_header_filter;
170 
171     fs->record.parse.last_buf = nxt_fastcgi_source_last_buf;
172     fs->record.parse.data = fs;
173     fs->record.parse.mem_pool = us->buffers.mem_pool;
174 
175     fs->query.context = &fs->record.parse;
176     fs->query.filter = nxt_fastcgi_source_record_filter;
177 
178     fs->header_in.content_length = -1;
179 
180     stream->out = nxt_fastcgi_request_create(fs);
181 
182     if (nxt_fast_path(stream->out != NULL)) {
183         nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
184         fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
185 
186         nxt_stream_source_connect(task, stream);
187         return;
188     }
189 
190 fail:
191 
192     nxt_fastcgi_source_fail(task, fs);
193 }
194 
195 
196 static nxt_buf_t *
197 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
198 {
199     u_char               *p, *record_length;
200     size_t               len, size, max_record_size;
201     nxt_int_t            ret;
202     nxt_buf_t            *b, *req, **prev;
203     nxt_bool_t           begin_request;
204     nxt_fastcgi_param_t  param;
205 
206     nxt_thread_log_debug("fastcgi request");
207 
208     begin_request = 1;
209     param.len = 0;
210     prev = &req;
211 
212 new_buffer:
213 
214     ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0);
215     if (nxt_slow_path(ret != NXT_OK)) {
216         return NULL;
217     }
218 
219     b = fs->upstream->buffers.current;
220     fs->upstream->buffers.current = NULL;
221 
222     *prev = b;
223     prev = &b->next;
224 
225 new_record:
226 
227     size = b->mem.end - b->mem.free;
228     size = nxt_align_size(size, 8) - 8;
229     /* The maximal FastCGI record content size is 65535.  65528 is 64K - 8. */
230     max_record_size = nxt_min(65528, size);
231 
232     p = b->mem.free;
233 
234     if (begin_request) {
235         /* TODO: fastcgi keep conn in flags. */
236         p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16);
237         max_record_size -= 16;
238         begin_request = 0;
239     }
240 
241     b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8);
242     record_length = &p[4];
243     size = 0;
244 
245     for ( ;; ) {
246         if (param.len == 0) {
247             ret = nxt_fastcgi_next_param(fs, &param);
248 
249             if (nxt_slow_path(ret != NXT_OK)) {
250 
251                 if (nxt_slow_path(ret == NXT_ERROR)) {
252                     return NULL;
253                 }
254 
255                 /* ret == NXT_DONE */
256                 break;
257             }
258         }
259 
260         len = max_record_size;
261 
262         if (nxt_fast_path(len >= param.len)) {
263             len = param.len;
264             param.len = 0;
265 
266         } else {
267             param.len -= len;
268         }
269 
270         nxt_thread_log_debug("fastcgi copy len:%uz", len);
271 
272         b->mem.free = nxt_cpymem(b->mem.free, param.buf, len);
273 
274         size += len;
275         max_record_size -= len;
276 
277         if (nxt_slow_path(param.len != 0)) {
278             /* The record is full. */
279 
280             param.buf += len;
281 
282             nxt_thread_log_debug("fastcgi content size:%uz", size);
283 
284             nxt_fastcgi_set_record_length(record_length, size);
285 
286             /* The minimal size of aligned record with content is 16 bytes. */
287             if (b->mem.end - b->mem.free >= 16) {
288                 goto new_record;
289             }
290 
291             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
292                                  b->mem.pos);
293             goto new_buffer;
294         }
295     }
296 
297     nxt_thread_log_debug("fastcgi content size:%uz", size);
298 
299     nxt_fastcgi_set_record_length(record_length, size);
300 
301     /* A padding length. */
302     size = 8 - size % 8;
303     record_length[2] = (u_char) size;
304     nxt_memzero(b->mem.free, size);
305     b->mem.free += size;
306 
307     nxt_thread_log_debug("fastcgi padding:%uz", size);
308 
309     if (b->mem.end - b->mem.free < 16) {
310         nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
311 
312         b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0);
313         if (nxt_slow_path(b == NULL)) {
314             return NULL;
315         }
316 
317         *prev = b;
318         prev = &b->next;
319     }
320 
321     /* The end of FastCGI params. */
322     p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8);
323 
324     /* The end of FastCGI stdin. */
325     b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8);
326 
327     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
328 
329     return req;
330 }
331 
332 
333 static nxt_int_t
334 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
335 {
336     nxt_int_t  ret;
337 
338     enum {
339          sw_name_length = 0,
340          sw_value_length,
341          sw_name,
342          sw_value,
343     };
344 
345     switch (fs->state) {
346 
347     case sw_name_length:
348         ret = fs->request_create(fs);
349 
350         if (nxt_slow_path(ret != NXT_OK)) {
351             return ret;
352         }
353 
354         nxt_thread_log_debug("fastcgi param \"%V: %V\"",
355                              &fs->u.request.name, &fs->u.request.value);
356 
357         fs->state = sw_value_length;
358         param->buf = param->length;
359         param->len = nxt_fastcgi_param_length(param->length,
360                                               fs->u.request.name.len);
361         break;
362 
363     case sw_value_length:
364         fs->state = sw_name;
365         param->buf = param->length;
366         param->len = nxt_fastcgi_param_length(param->length,
367                                               fs->u.request.value.len);
368         break;
369 
370     case sw_name:
371         fs->state = sw_value;
372         param->buf = fs->u.request.name.data;
373         param->len = fs->u.request.name.len;
374         break;
375 
376     case sw_value:
377         fs->state = sw_name_length;
378         param->buf = fs->u.request.value.data;
379         param->len = fs->u.request.value.len;
380         break;
381     }
382 
383     return NXT_OK;
384 }
385 
386 
387 static void
388 nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
389 {
390     size_t                       size;
391     u_char                       *p;
392     nxt_buf_t                    *b, *in;
393     nxt_fastcgi_source_t         *fs;
394     nxt_fastcgi_source_record_t  *fsr;
395 
396     fsr = obj;
397     in = data;
398 
399     nxt_debug(task, "fastcgi source record filter");
400 
401     if (nxt_slow_path(fsr->parse.done)) {
402         return;
403     }
404 
405     nxt_fastcgi_record_parse(task, &fsr->parse, in);
406 
407     fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
408 
409     if (fsr->parse.error) {
410         nxt_fastcgi_source_fail(task, fs);
411         return;
412     }
413 
414     if (fsr->parse.fastcgi_error) {
415         /*
416          * Output all parsed before a FastCGI record error and close upstream.
417          */
418         nxt_thread_current_work_queue_add(task->thread,
419                                           nxt_fastcgi_source_record_error,
420                                           task, fs, NULL);
421     }
422 
423     /* Log FastCGI stderr output. */
424 
425     for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
426 
427         for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
428             if (*p != NXT_CR && *p != NXT_LF) {
429                 break;
430             }
431         }
432 
433         size = (p + 1) - b->mem.pos;
434 
435         if (size != 0) {
436             nxt_log(task, NXT_LOG_ERR,
437                     "upstream sent in FastCGI stderr: \"%*s\"",
438                     size, b->mem.pos);
439         }
440 
441         b->completion_handler(task, b, b->parent);
442     }
443 
444     /* Process FastCGI stdout output. */
445 
446     if (fsr->parse.out[0] != NULL) {
447         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
448                           &fsr->next, fsr->parse.out[0]);
449     }
450 }
451 
452 
453 static void
454 nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
455 {
456     nxt_fastcgi_source_t  *fs;
457 
458     fs = obj;
459 
460     nxt_fastcgi_source_fail(task, fs);
461 }
462 
463 
464 static void
465 nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
466 {
467     nxt_int_t             ret;
468     nxt_buf_t             *b;
469     nxt_fastcgi_source_t  *fs;
470 
471     fs = obj;
472     b = data;
473 
474     do {
475         nxt_debug(task, "fastcgi source header filter");
476 
477         if (nxt_slow_path(nxt_buf_is_sync(b))) {
478             nxt_fastcgi_source_sync_buffer(task, fs, b);
479             return;
480         }
481 
482         for ( ;; ) {
483             ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
484 
485             if (nxt_slow_path(ret != NXT_OK)) {
486                 break;
487             }
488 
489             ret = nxt_fastcgi_source_header_process(task, fs);
490 
491             if (nxt_slow_path(ret != NXT_OK)) {
492                 break;
493             }
494         }
495 
496         if (nxt_fast_path(ret == NXT_DONE)) {
497             nxt_debug(task, "fastcgi source header done");
498             nxt_fastcgi_source_header_ready(fs, b);
499             return;
500         }
501 
502         if (nxt_fast_path(ret != NXT_AGAIN)) {
503 
504             if (ret != NXT_ERROR) {
505                 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
506                 nxt_log(task, NXT_LOG_ERR,
507                         "upstream sent invalid header line: \"%*s\\r...\"",
508                         fs->u.header.parse.header_end
509                             - fs->u.header.parse.header_name_start,
510                         fs->u.header.parse.header_name_start);
511             }
512 
513             /* ret == NXT_ERROR */
514 
515             nxt_fastcgi_source_fail(task, fs);
516             return;
517         }
518 
519         b = b->next;
520 
521     } while (b != NULL);
522 }
523 
524 
525 static void
526 nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
527     nxt_buf_t *b)
528 {
529     if (nxt_buf_is_last(b)) {
530         nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
531 
532     } else {
533         nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
534                 "enough to process upstream response header",
535                 fs->upstream->buffers.max, fs->upstream->buffers.size);
536     }
537 
538     /* The stream source sends only the last and the nobuf sync buffer. */
539 
540     nxt_fastcgi_source_fail(task, fs);
541 }
542 
543 
544 static nxt_int_t
545 nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
546 {
547     size_t                     len;
548     nxt_name_value_t           *nv;
549     nxt_lvlhsh_query_t         lhq;
550     nxt_http_header_parse_t    *hp;
551     nxt_upstream_name_value_t  *unv;
552 
553     hp = &fs->u.header.parse;
554 
555     len = hp->header_name_end - hp->header_name_start;
556 
557     if (len > 255) {
558         nxt_log(task, NXT_LOG_INFO,
559                 "upstream sent too long header field name: \"%*s\"",
560                 len, hp->header_name_start);
561         return NXT_ERROR;
562     }
563 
564     nv = nxt_list_add(fs->header_in.list);
565     if (nxt_slow_path(nv == NULL)) {
566         return NXT_ERROR;
567     }
568 
569     nv->hash = hp->header_hash;
570     nv->skip = 0;
571     nv->name_len = len;
572     nv->name_start = hp->header_name_start;
573     nv->value_len = hp->header_end - hp->header_start;
574     nv->value_start = hp->header_start;
575 
576     nxt_debug(task, "http header: \"%*s: %*s\"",
577               nv->name_len, nv->name_start, nv->value_len, nv->value_start);
578 
579     lhq.key_hash = nv->hash;
580     lhq.key.len = nv->name_len;
581     lhq.key.data = nv->name_start;
582     lhq.proto = &nxt_upstream_header_hash_proto;
583 
584     if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
585         unv = lhq.value;
586 
587         if (unv->handler(fs->upstream, nv) == NXT_OK) {
588             return NXT_ERROR;
589         }
590     }
591 
592     return NXT_OK;
593 }
594 
595 
596 static const nxt_upstream_name_value_t  nxt_fastcgi_source_headers[]
597     nxt_aligned(32) =
598 {
599     { nxt_fastcgi_source_status,
600       nxt_upstream_name_value("status") },
601 
602     { nxt_fastcgi_source_content_length,
603       nxt_upstream_name_value("content-length") },
604 };
605 
606 
607 nxt_int_t
608 nxt_fastcgi_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
609 {
610     return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers,
611                                         nxt_nitems(nxt_fastcgi_source_headers));
612 }
613 
614 
615 static nxt_int_t
616 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv)
617 {
618     nxt_int_t             n;
619     nxt_str_t             s;
620     nxt_fastcgi_source_t  *fs;
621 
622     s.len = nv->value_len;
623     s.data = nv->value_start;
624 
625     n = nxt_str_int_parse(&s);
626 
627     if (nxt_fast_path(n > 0)) {
628         fs = us->protocol_source;
629         fs->header_in.status = n;
630         return NXT_OK;
631     }
632 
633     return NXT_ERROR;
634 }
635 
636 
637 static nxt_int_t
638 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
639     nxt_name_value_t *nv)
640 {
641     nxt_off_t             length;
642     nxt_fastcgi_source_t  *fs;
643 
644     length = nxt_off_t_parse(nv->value_start, nv->value_len);
645 
646     if (nxt_fast_path(length > 0)) {
647         fs = us->protocol_source;
648         fs->header_in.content_length = length;
649         return NXT_OK;
650     }
651 
652     return NXT_ERROR;
653 }
654 
655 
656 static void
657 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
658 {
659     /*
660      * Change the FastCGI source filter chain:
661      *   stream source | FastCGI record filter | FastCGI body filter
662      */
663     fs->record.next.filter = nxt_fastcgi_source_body_filter;
664 
665     if (nxt_buf_mem_used_size(&b->mem) != 0) {
666         fs->rest = b;
667     }
668 
669     if (fs->header_in.status == 0) {
670         /* The "200 OK" status by default. */
671         fs->header_in.status = 200;
672     }
673 
674     fs->upstream->state->ready_handler(fs);
675 }
676 
677 
678 /*
679  * The FastCGI source body filter accumulates first body buffers before the next
680  * filter will be established and sets completion handler for the last buffer.
681  */
682 
683 static void
684 nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
685 {
686     nxt_buf_t             *b, *in;
687     nxt_fastcgi_source_t  *fs;
688 
689     fs = obj;
690     in = data;
691 
692     nxt_debug(task, "fastcgi source body filter");
693 
694     for (b = in; b != NULL; b = b->next) {
695 
696         if (nxt_buf_is_last(b)) {
697             b->data = fs->upstream->data;
698             b->completion_handler = fs->upstream->state->completion_handler;
699         }
700     }
701 
702     if (fs->next != NULL) {
703         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
704                           fs->next, in);
705         return;
706     }
707 
708     nxt_buf_chain_add(&fs->rest, in);
709 }
710 
711 
712 static nxt_buf_t *
713 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
714 {
715     nxt_buf_t             *b;
716     nxt_fastcgi_source_t  *fs;
717 
718     fs = fp->data;
719 
720     b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST);
721 
722     if (nxt_fast_path(b != NULL)) {
723         b->data = fs->upstream->data;
724         b->completion_handler = fs->upstream->state->completion_handler;
725     }
726 
727     return b;
728 }
729 
730 
731 static void
732 nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
733 {
734     nxt_fastcgi_source_t  *fs;
735 
736     nxt_thread_log_debug("fastcgi source error");
737 
738     fs = stream->upstream->protocol_source;
739 
740     nxt_fastcgi_source_fail(task, fs);
741 }
742 
743 
744 static void
745 nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
746 {
747     nxt_debug(task, "fastcgi source fail");
748 
749     /* TODO: fail, next upstream, or bad gateway */
750 
751     fs->upstream->state->error_handler(task, fs, NULL);
752 }
753