xref: /unit/src/nxt_fastcgi_source.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_FASTCGI_RESPONDER  1
11 #define NXT_FASTCGI_KEEP_CONN  1
12 
13 
14 typedef struct {
15     u_char    *buf;
16     uint32_t  len;
17     u_char    length[4];
18 } nxt_fastcgi_param_t;
19 
20 
21 #define                                                                       \
22 nxt_fastcgi_set_record_length(p, length)                                      \
23     do {                                                                      \
24         uint32_t  len = length;                                               \
25                                                                               \
26         p[1] = (u_char) len;  len >>= 8;                                      \
27         p[0] = (u_char) len;                                                  \
28     } while (0)
29 
30 
31 nxt_inline size_t
32 nxt_fastcgi_param_length(u_char *p, uint32_t length)
33 {
34     if (nxt_fast_path(length < 128)) {
35         *p = (u_char) length;
36         return 1;
37     }
38 
39     p[3] = (u_char) length;  length >>= 8;
40     p[2] = (u_char) length;  length >>= 8;
41     p[1] = (u_char) length;  length >>= 8;
42     p[0] = (u_char) (length | 0x80);
43 
44     return 4;
45 }
46 
47 
48 static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
49 static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
50     nxt_fastcgi_param_t *param);
51 
52 static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
57     void *data);
58 static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
59     nxt_fastcgi_source_t *fs, nxt_buf_t *b);
60 
61 static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
62     nxt_fastcgi_source_t *fs);
63 static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
64     nxt_name_value_t *nv);
65 static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
66     nxt_name_value_t *nv);
67 
68 static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
69     nxt_buf_t *b);
70 static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
71     void *data);
72 static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
73 static void nxt_fastcgi_source_error(nxt_task_t *task,
74     nxt_stream_source_t *stream);
75 static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
76 
77 
78 /*
79  * A FastCGI request:
80  *   FCGI_BEGIN_REQUEST record;
81  *   Several FCGI_PARAMS records, the last FCGI_PARAMS record must have
82  *   zero content length,
83  *   Several FCGI_STDIN records, the last FCGI_STDIN record must have
84  *   zero content length.
85  */
86 
87 static const uint8_t  nxt_fastcgi_begin_request[] = {
88     1,                                 /* FastCGI version.                   */
89     NXT_FASTCGI_BEGIN_REQUEST,         /* The BEGIN_REQUEST record type.     */
90     0, 1,                              /* Request ID.                        */
91     0, 8,                              /* Content length of the Role record. */
92     0,                                 /* Padding length.                    */
93     0,                                 /* Reserved.                          */
94 
95     0, NXT_FASTCGI_RESPONDER,          /* The Responder Role.                */
96     0,                                 /* Flags.                             */
97     0, 0, 0, 0, 0,                     /* Reserved.                          */
98 };
99 
100 
101 static const uint8_t  nxt_fastcgi_params_record[] = {
102     1,                                 /* FastCGI version.                   */
103     NXT_FASTCGI_PARAMS,                /* The PARAMS record type.            */
104     0, 1,                              /* Request ID.                        */
105     0, 0,                              /* Content length.                    */
106     0,                                 /* Padding length.                    */
107     0,                                 /* Reserved.                          */
108 };
109 
110 
111 static const uint8_t  nxt_fastcgi_stdin_record[] = {
112     1,                                 /* FastCGI version.                   */
113     NXT_FASTCGI_STDIN,                 /* The STDIN record type.             */
114     0, 1,                              /* Request ID.                        */
115     0, 0,                              /* Content length.                    */
116     0,                                 /* Padding length.                    */
117     0,                                 /* Reserved.                          */
118 };
119 
120 
121 void
122 nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
123     nxt_fastcgi_source_request_create_t request_create)
124 {
125     nxt_stream_source_t   *stream;
126     nxt_fastcgi_source_t  *fs;
127 
128     fs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t));
129     if (nxt_slow_path(fs == NULL)) {
130         goto fail;
131     }
132 
133     us->protocol_source = fs;
134 
135     fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
136                                          sizeof(nxt_name_value_t));
137     if (nxt_slow_path(fs->header_in.list == NULL)) {
138         goto fail;
139     }
140 
141     fs->header_in.hash = us->header_hash;
142     fs->upstream = us;
143     fs->request_create = request_create;
144 
145     stream = us->stream;
146 
147     if (stream == NULL) {
148         stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t));
149         if (nxt_slow_path(stream == NULL)) {
150             goto fail;
151         }
152 
153         us->stream = stream;
154         stream->upstream = us;
155 
156     } else {
157         nxt_memzero(stream, sizeof(nxt_stream_source_t));
158     }
159 
160     /*
161      * Create the FastCGI source filter chain:
162      *   stream source | FastCGI record filter | FastCGI HTTP header filter
163      */
164     stream->next = &fs->query;
165     stream->error_handler = nxt_fastcgi_source_error;
166 
167     fs->record.next.context = fs;
168     fs->record.next.filter = nxt_fastcgi_source_header_filter;
169 
170     fs->record.parse.last_buf = nxt_fastcgi_source_last_buf;
171     fs->record.parse.data = fs;
172     fs->record.parse.mem_pool = us->buffers.mem_pool;
173 
174     fs->query.context = &fs->record.parse;
175     fs->query.filter = nxt_fastcgi_source_record_filter;
176 
177     fs->header_in.content_length = -1;
178 
179     stream->out = nxt_fastcgi_request_create(fs);
180 
181     if (nxt_fast_path(stream->out != NULL)) {
182         nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
183         fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
184 
185         nxt_stream_source_connect(task, stream);
186         return;
187     }
188 
189 fail:
190 
191     nxt_fastcgi_source_fail(task, fs);
192 }
193 
194 
195 static nxt_buf_t *
196 nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs)
197 {
198     u_char               *p, *record_length;
199     size_t               len, size, max_record_size;
200     nxt_int_t            ret;
201     nxt_buf_t            *b, *req, **prev;
202     nxt_bool_t           begin_request;
203     nxt_fastcgi_param_t  param;
204 
205     nxt_thread_log_debug("fastcgi request");
206 
207     begin_request = 1;
208     param.len = 0;
209     prev = &req;
210 
211 new_buffer:
212 
213     ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0);
214     if (nxt_slow_path(ret != NXT_OK)) {
215         return NULL;
216     }
217 
218     b = fs->upstream->buffers.current;
219     fs->upstream->buffers.current = NULL;
220 
221     *prev = b;
222     prev = &b->next;
223 
224 new_record:
225 
226     size = b->mem.end - b->mem.free;
227     size = nxt_align_size(size, 8) - 8;
228     /* The maximal FastCGI record content size is 65535.  65528 is 64K - 8. */
229     max_record_size = nxt_min(65528, size);
230 
231     p = b->mem.free;
232 
233     if (begin_request) {
234         /* TODO: fastcgi keep conn in flags. */
235         p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16);
236         max_record_size -= 16;
237         begin_request = 0;
238     }
239 
240     b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8);
241     record_length = &p[4];
242     size = 0;
243 
244     for ( ;; ) {
245         if (param.len == 0) {
246             ret = nxt_fastcgi_next_param(fs, &param);
247 
248             if (nxt_slow_path(ret != NXT_OK)) {
249 
250                 if (nxt_slow_path(ret == NXT_ERROR)) {
251                     return NULL;
252                 }
253 
254                 /* ret == NXT_DONE */
255                 break;
256             }
257         }
258 
259         len = max_record_size;
260 
261         if (nxt_fast_path(len >= param.len)) {
262             len = param.len;
263             param.len = 0;
264 
265         } else {
266             param.len -= len;
267         }
268 
269         nxt_thread_log_debug("fastcgi copy len:%uz", len);
270 
271         b->mem.free = nxt_cpymem(b->mem.free, param.buf, len);
272 
273         size += len;
274         max_record_size -= len;
275 
276         if (nxt_slow_path(param.len != 0)) {
277             /* The record is full. */
278 
279             param.buf += len;
280 
281             nxt_thread_log_debug("fastcgi content size:%uz", size);
282 
283             nxt_fastcgi_set_record_length(record_length, size);
284 
285             /* The minimal size of aligned record with content is 16 bytes. */
286             if (b->mem.end - b->mem.free >= 16) {
287                 goto new_record;
288             }
289 
290             nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
291                                  b->mem.pos);
292             goto new_buffer;
293         }
294     }
295 
296     nxt_thread_log_debug("fastcgi content size:%uz", size);
297 
298     nxt_fastcgi_set_record_length(record_length, size);
299 
300     /* A padding length. */
301     size = 8 - size % 8;
302     record_length[2] = (u_char) size;
303     nxt_memzero(b->mem.free, size);
304     b->mem.free += size;
305 
306     nxt_thread_log_debug("fastcgi padding:%uz", size);
307 
308     if (b->mem.end - b->mem.free < 16) {
309         nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
310 
311         b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0);
312         if (nxt_slow_path(b == NULL)) {
313             return NULL;
314         }
315 
316         *prev = b;
317         prev = &b->next;
318     }
319 
320     /* The end of FastCGI params. */
321     p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8);
322 
323     /* The end of FastCGI stdin. */
324     b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8);
325 
326     nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
327 
328     return req;
329 }
330 
331 
332 static nxt_int_t
333 nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
334 {
335     nxt_int_t  ret;
336 
337     enum {
338          sw_name_length = 0,
339          sw_value_length,
340          sw_name,
341          sw_value,
342     };
343 
344     switch (fs->state) {
345 
346     case sw_name_length:
347         ret = fs->request_create(fs);
348 
349         if (nxt_slow_path(ret != NXT_OK)) {
350             return ret;
351         }
352 
353         nxt_thread_log_debug("fastcgi param \"%V: %V\"",
354                              &fs->u.request.name, &fs->u.request.value);
355 
356         fs->state = sw_value_length;
357         param->buf = param->length;
358         param->len = nxt_fastcgi_param_length(param->length,
359                                               fs->u.request.name.len);
360         break;
361 
362     case sw_value_length:
363         fs->state = sw_name;
364         param->buf = param->length;
365         param->len = nxt_fastcgi_param_length(param->length,
366                                               fs->u.request.value.len);
367         break;
368 
369     case sw_name:
370         fs->state = sw_value;
371         param->buf = fs->u.request.name.data;
372         param->len = fs->u.request.name.len;
373         break;
374 
375     case sw_value:
376         fs->state = sw_name_length;
377         param->buf = fs->u.request.value.data;
378         param->len = fs->u.request.value.len;
379         break;
380     }
381 
382     return NXT_OK;
383 }
384 
385 
386 static void
387 nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
388 {
389     size_t                       size;
390     u_char                       *p;
391     nxt_buf_t                    *b, *in;
392     nxt_fastcgi_source_t         *fs;
393     nxt_fastcgi_source_record_t  *fsr;
394 
395     fsr = obj;
396     in = data;
397 
398     nxt_debug(task, "fastcgi source record filter");
399 
400     if (nxt_slow_path(fsr->parse.done)) {
401         return;
402     }
403 
404     nxt_fastcgi_record_parse(task, &fsr->parse, in);
405 
406     fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
407 
408     if (fsr->parse.error) {
409         nxt_fastcgi_source_fail(task, fs);
410         return;
411     }
412 
413     if (fsr->parse.fastcgi_error) {
414         /*
415          * Output all parsed before a FastCGI record error and close upstream.
416          */
417         nxt_thread_current_work_queue_add(task->thread,
418                                           nxt_fastcgi_source_record_error,
419                                           task, fs, NULL);
420     }
421 
422     /* Log FastCGI stderr output. */
423 
424     for (b = fsr->parse.out[1]; b != NULL; b = b->next) {
425 
426         for (p = b->mem.free - 1; p >= b->mem.pos; p--) {
427             if (*p != NXT_CR && *p != NXT_LF) {
428                 break;
429             }
430         }
431 
432         size = (p + 1) - b->mem.pos;
433 
434         if (size != 0) {
435             nxt_log(task, NXT_LOG_ERR,
436                     "upstream sent in FastCGI stderr: \"%*s\"",
437                     size, b->mem.pos);
438         }
439 
440         b->completion_handler(task, b, b->parent);
441     }
442 
443     /* Process FastCGI stdout output. */
444 
445     if (fsr->parse.out[0] != NULL) {
446         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
447                           &fsr->next, fsr->parse.out[0]);
448     }
449 }
450 
451 
452 static void
453 nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
454 {
455     nxt_fastcgi_source_t  *fs;
456 
457     fs = obj;
458 
459     nxt_fastcgi_source_fail(task, fs);
460 }
461 
462 
463 static void
464 nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
465 {
466     nxt_int_t             ret;
467     nxt_buf_t             *b;
468     nxt_fastcgi_source_t  *fs;
469 
470     fs = obj;
471     b = data;
472 
473     do {
474         nxt_debug(task, "fastcgi source header filter");
475 
476         if (nxt_slow_path(nxt_buf_is_sync(b))) {
477             nxt_fastcgi_source_sync_buffer(task, fs, b);
478             return;
479         }
480 
481         for ( ;; ) {
482             ret = nxt_http_split_header_parse(&fs->u.header, &b->mem);
483 
484             if (nxt_slow_path(ret != NXT_OK)) {
485                 break;
486             }
487 
488             ret = nxt_fastcgi_source_header_process(task, fs);
489 
490             if (nxt_slow_path(ret != NXT_OK)) {
491                 break;
492             }
493         }
494 
495         if (nxt_fast_path(ret == NXT_DONE)) {
496             nxt_debug(task, "fastcgi source header done");
497             nxt_fastcgi_source_header_ready(fs, b);
498             return;
499         }
500 
501         if (nxt_fast_path(ret != NXT_AGAIN)) {
502 
503             if (ret != NXT_ERROR) {
504                 /* n == NXT_DECLINED: "\r" is not followed by "\n" */
505                 nxt_log(task, NXT_LOG_ERR,
506                         "upstream sent invalid header line: \"%*s\\r...\"",
507                         fs->u.header.parse.header_end
508                             - fs->u.header.parse.header_name_start,
509                         fs->u.header.parse.header_name_start);
510             }
511 
512             /* ret == NXT_ERROR */
513 
514             nxt_fastcgi_source_fail(task, fs);
515             return;
516         }
517 
518         b = b->next;
519 
520     } while (b != NULL);
521 }
522 
523 
524 static void
525 nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
526     nxt_buf_t *b)
527 {
528     if (nxt_buf_is_last(b)) {
529         nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
530 
531     } else {
532         nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
533                 "enough to process upstream response header",
534                 fs->upstream->buffers.max, fs->upstream->buffers.size);
535     }
536 
537     /* The stream source sends only the last and the nobuf sync buffer. */
538 
539     nxt_fastcgi_source_fail(task, fs);
540 }
541 
542 
543 static nxt_int_t
544 nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
545 {
546     size_t                     len;
547     nxt_name_value_t           *nv;
548     nxt_lvlhsh_query_t         lhq;
549     nxt_http_header_parse_t    *hp;
550     nxt_upstream_name_value_t  *unv;
551 
552     hp = &fs->u.header.parse;
553 
554     len = hp->header_name_end - hp->header_name_start;
555 
556     if (len > 255) {
557         nxt_log(task, NXT_LOG_INFO,
558                 "upstream sent too long header field name: \"%*s\"",
559                 len, hp->header_name_start);
560         return NXT_ERROR;
561     }
562 
563     nv = nxt_list_add(fs->header_in.list);
564     if (nxt_slow_path(nv == NULL)) {
565         return NXT_ERROR;
566     }
567 
568     nv->hash = hp->header_hash;
569     nv->skip = 0;
570     nv->name_len = len;
571     nv->name_start = hp->header_name_start;
572     nv->value_len = hp->header_end - hp->header_start;
573     nv->value_start = hp->header_start;
574 
575     nxt_debug(task, "http header: \"%*s: %*s\"",
576               nv->name_len, nv->name_start, nv->value_len, nv->value_start);
577 
578     lhq.key_hash = nv->hash;
579     lhq.key.len = nv->name_len;
580     lhq.key.data = nv->name_start;
581     lhq.proto = &nxt_upstream_header_hash_proto;
582 
583     if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) {
584         unv = lhq.value;
585 
586         if (unv->handler(fs->upstream, nv) == NXT_OK) {
587             return NXT_ERROR;
588         }
589     }
590 
591     return NXT_OK;
592 }
593 
594 
595 static const nxt_upstream_name_value_t  nxt_fastcgi_source_headers[]
596     nxt_aligned(32) =
597 {
598     { nxt_fastcgi_source_status,
599       nxt_upstream_name_value("status") },
600 
601     { nxt_fastcgi_source_content_length,
602       nxt_upstream_name_value("content-length") },
603 };
604 
605 
606 nxt_int_t
607 nxt_fastcgi_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh)
608 {
609     return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers,
610                                         nxt_nitems(nxt_fastcgi_source_headers));
611 }
612 
613 
614 static nxt_int_t
615 nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv)
616 {
617     nxt_int_t             n;
618     nxt_str_t             s;
619     nxt_fastcgi_source_t  *fs;
620 
621     s.len = nv->value_len;
622     s.data = nv->value_start;
623 
624     n = nxt_str_int_parse(&s);
625 
626     if (nxt_fast_path(n > 0)) {
627         fs = us->protocol_source;
628         fs->header_in.status = n;
629         return NXT_OK;
630     }
631 
632     return NXT_ERROR;
633 }
634 
635 
636 static nxt_int_t
637 nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
638     nxt_name_value_t *nv)
639 {
640     nxt_off_t             length;
641     nxt_fastcgi_source_t  *fs;
642 
643     length = nxt_off_t_parse(nv->value_start, nv->value_len);
644 
645     if (nxt_fast_path(length > 0)) {
646         fs = us->protocol_source;
647         fs->header_in.content_length = length;
648         return NXT_OK;
649     }
650 
651     return NXT_ERROR;
652 }
653 
654 
655 static void
656 nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
657 {
658     /*
659      * Change the FastCGI source filter chain:
660      *   stream source | FastCGI record filter | FastCGI body filter
661      */
662     fs->record.next.filter = nxt_fastcgi_source_body_filter;
663 
664     if (nxt_buf_mem_used_size(&b->mem) != 0) {
665         fs->rest = b;
666     }
667 
668     if (fs->header_in.status == 0) {
669         /* The "200 OK" status by default. */
670         fs->header_in.status = 200;
671     }
672 
673     fs->upstream->state->ready_handler(fs);
674 }
675 
676 
677 /*
678  * The FastCGI source body filter accumulates first body buffers before the next
679  * filter will be established and sets completion handler for the last buffer.
680  */
681 
682 static void
683 nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
684 {
685     nxt_buf_t             *b, *in;
686     nxt_fastcgi_source_t  *fs;
687 
688     fs = obj;
689     in = data;
690 
691     nxt_debug(task, "fastcgi source body filter");
692 
693     for (b = in; b != NULL; b = b->next) {
694 
695         if (nxt_buf_is_last(b)) {
696             b->data = fs->upstream->data;
697             b->completion_handler = fs->upstream->state->completion_handler;
698         }
699     }
700 
701     if (fs->next != NULL) {
702         nxt_source_filter(task->thread, fs->upstream->work_queue, task,
703                           fs->next, in);
704         return;
705     }
706 
707     nxt_buf_chain_add(&fs->rest, in);
708 }
709 
710 
711 static nxt_buf_t *
712 nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
713 {
714     nxt_buf_t             *b;
715     nxt_fastcgi_source_t  *fs;
716 
717     fs = fp->data;
718 
719     b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST);
720 
721     if (nxt_fast_path(b != NULL)) {
722         b->data = fs->upstream->data;
723         b->completion_handler = fs->upstream->state->completion_handler;
724     }
725 
726     return b;
727 }
728 
729 
730 static void
731 nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
732 {
733     nxt_fastcgi_source_t  *fs;
734 
735     nxt_thread_log_debug("fastcgi source error");
736 
737     fs = stream->upstream->protocol_source;
738 
739     nxt_fastcgi_source_fail(task, fs);
740 }
741 
742 
743 static void
744 nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
745 {
746     nxt_debug(task, "fastcgi source fail");
747 
748     /* TODO: fail, next upstream, or bad gateway */
749 
750     fs->upstream->state->error_handler(task, fs, NULL);
751 }
752