xref: /unit/src/nxt_buf_filter.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11 nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
12 static void nxt_buf_filter_file_read_start(nxt_thread_t *thr,
13     nxt_buf_filter_t *f);
14 static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f);
15 static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr,
16     void *obj, void *data);
17 static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj,
18     void *data);
19 static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj,
20     void *data);
21 
22 
23 void
24 nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b)
25 {
26     nxt_buf_chain_add(&f->input, b);
27 
28     nxt_buf_filter(thr, f, NULL);
29 }
30 
31 
32 void
33 nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
34 {
35     nxt_int_t         ret;
36     nxt_buf_t         *b;
37     nxt_buf_filter_t  *f;
38 
39     f = obj;
40 
41     nxt_log_debug(thr->log, "buf filter");
42 
43     if (f->done) {
44         return;
45     }
46 
47     f->queued = 0;
48 
49     for ( ;; ) {
50         /*
51          * f->input is a chain of original incoming buffers: memory,
52          *     mapped, file, and sync buffers;
53          * f->current is a currently processed memory buffer or a chain
54          *     of memory/file or mapped/file buffers which are read of
55          *     or populated from file;
56          * f->output is a chain of output buffers;
57          * f->last is the last output buffer in the chain.
58          */
59 
60         b = f->current;
61 
62         nxt_log_debug(thr->log, "buf filter current: %p", b);
63 
64         if (b == NULL) {
65 
66             if (f->reading) {
67                 return;
68             }
69 
70             b = f->input;
71 
72             nxt_log_debug(thr->log, "buf filter input: %p", b);
73 
74             if (b == NULL) {
75                 /*
76                  * The end of the input chain, pass
77                  * the output chain to the next filter.
78                  */
79                 nxt_buf_filter_next(f);
80 
81                 return;
82             }
83 
84             if (nxt_buf_is_mem(b)) {
85 
86                 f->current = b;
87                 f->input = b->next;
88                 b->next = NULL;
89 
90             } else if (nxt_buf_is_file(b)) {
91 
92                 if (f->run->filter_ready(f) != NXT_OK) {
93                     nxt_buf_filter_next(f);
94                 }
95 
96                 nxt_buf_filter_file_read_start(thr, f);
97                 return;
98             }
99         }
100 
101         if (nxt_buf_is_sync(b)) {
102 
103             ret = NXT_OK;
104             f->current = b;
105             f->input = b->next;
106             b->next = NULL;
107 
108             if (nxt_buf_is_nobuf(b)) {
109                 ret = f->run->filter_sync_nobuf(f);
110 
111             } else if (nxt_buf_is_flush(b)) {
112                 ret = f->run->filter_sync_flush(f);
113 
114             } else if (nxt_buf_is_last(b)) {
115                 ret = f->run->filter_sync_last(f);
116 
117                 f->done = (ret == NXT_OK);
118             }
119 
120             if (nxt_fast_path(ret == NXT_OK)) {
121                 continue;
122             }
123 
124             if (nxt_slow_path(ret == NXT_ERROR)) {
125                 goto fail;
126             }
127 
128             /* ret == NXT_AGAIN: No filter internal buffers available. */
129             goto nobuf;
130         }
131 
132         ret = f->run->filter_process(f);
133 
134         if (nxt_fast_path(ret == NXT_OK)) {
135             b = f->current;
136             /*
137              * A filter may just move f->current to f->output
138              * and then set f->current to NULL.
139              */
140             if (b != NULL && b->mem.pos == b->mem.free) {
141                 f->current = b->next;
142                 nxt_thread_work_queue_add(thr, f->work_queue,
143                                           b->completion_handler,
144                                           b, b->parent, thr->log);
145             }
146 
147             continue;
148         }
149 
150         if (nxt_slow_path(ret == NXT_ERROR)) {
151             goto fail;
152         }
153 
154         /* ret == NXT_AGAIN: No filter internal buffers available. */
155         goto nobuf;
156     }
157 
158 nobuf:
159 
160     /* ret == NXT_AGAIN: No filter internal buffers available. */
161 
162     if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163         return;
164     }
165 
166 fail:
167 
168     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
169                               f, f->data, thr->log);
170 }
171 
172 
173 static nxt_int_t
174 nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175 {
176     nxt_buf_t  *b;
177 
178     nxt_thread_log_debug("buf filter nobuf");
179 
180     b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
181 
182     if (nxt_fast_path(b != NULL)) {
183 
184         nxt_buf_chain_add(&f->output, b);
185         f->last = NULL;
186 
187         f->run->filter_next(f);
188 
189         f->output = NULL;
190 
191         return NXT_OK;
192     }
193 
194     return NXT_ERROR;
195 }
196 
197 
198 nxt_inline void
199 nxt_buf_filter_next(nxt_buf_filter_t *f)
200 {
201     if (f->output != NULL) {
202         f->last = NULL;
203 
204         f->run->filter_next(f);
205         f->output = NULL;
206     }
207 }
208 
209 
210 void
211 nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f)
212 {
213     nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued);
214 
215     if (!f->queued && !f->done) {
216         f->queued = 1;
217         nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter,
218                                   f, NULL, thr->log);
219     }
220 }
221 
222 
223 static void
224 nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
225 {
226     nxt_job_file_t         *jbf;
227     nxt_buf_filter_file_t  *ff;
228 
229     ff = f->run->job_file_create(f);
230 
231     if (nxt_slow_path(ff == NULL)) {
232         nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
233                                   f, f->data, thr->log);
234         return;
235     }
236 
237     f->filter_file = ff;
238 
239     jbf = &ff->job_file;
240     jbf->file = *f->input->file;
241 
242     jbf->ready_handler = nxt_buf_filter_file_job_completion;
243     jbf->error_handler = nxt_buf_filter_file_read_error;
244 
245     nxt_job_set_name(&jbf->job, "buf filter job file");
246 
247     f->reading = 1;
248 
249     nxt_buf_filter_file_read(thr, f);
250 }
251 
252 
253 static void
254 nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
255 {
256     nxt_int_t              ret;
257     nxt_off_t              size;
258     nxt_buf_t              *b;
259     nxt_buf_filter_file_t  *ff;
260 
261     ff = f->filter_file;
262 
263     if (ff->job_file.buffer != NULL) {
264         /* File is now being read. */
265         return;
266     }
267 
268     size = f->input->file_end - f->input->file_pos;
269 
270     if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
271         /*
272          * Small size value is a hint for buffer pool allocation
273          * size, but if size of the size_t type is lesser than size
274          * of the nxt_off_t type, the large size value may be truncated,
275          * so use a default buffer pool allocation size.
276          */
277         size = 0;
278     }
279 
280     if (f->mmap) {
281         ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
282 
283     } else {
284         ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
285     }
286 
287     if (nxt_fast_path(ret == NXT_OK)) {
288         b = ff->buffers.current;
289 
290         b->file_pos = f->input->file_pos;
291         b->file_end = f->input->file_pos;
292         b->file = f->input->file;
293 
294         ff->job_file.buffer = b;
295         ff->job_file.offset = f->input->file_pos;
296 
297         f->run->job_file_retain(f);
298 
299         nxt_job_file_read(thr, &ff->job_file.job);
300         return;
301     }
302 
303     if (nxt_fast_path(ret != NXT_ERROR)) {
304 
305         /* ret == NXT_AGAIN: No buffers available. */
306 
307         if (f->buffering) {
308             f->buffering = 0;
309 
310             if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
311                 return;
312             }
313 
314         } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
315             return;
316         }
317     }
318 
319     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
320                               f, f->data, thr->log);
321 }
322 
323 
324 typedef struct {
325     nxt_buf_filter_t  *filter;
326     nxt_buf_t         *buf;
327 } nxt_buf_filter_ctx_t;
328 
329 
330 static void
331 nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
332 {
333     nxt_buf_t             *b;
334     nxt_bool_t            done;
335     nxt_job_file_t        *jbf;
336     nxt_buf_filter_t      *f;
337     nxt_buf_filter_ctx_t  *ctx;
338 
339     jbf = obj;
340     f = data;
341     b = jbf->buffer;
342     jbf->buffer = NULL;
343 
344     nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O",
345                   jbf->file.name, b->file_pos, b->file_end);
346 
347     f->run->job_file_release(f);
348 
349     ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
350     if (nxt_slow_path(ctx == NULL)) {
351         goto fail;
352     }
353 
354     ctx->filter = f;
355     ctx->buf = f->input;
356 
357     f->input->file_pos = b->file_end;
358 
359     done = (f->input->file_pos == f->input->file_end);
360 
361     if (done) {
362         f->input = f->input->next;
363         f->reading = 0;
364     }
365 
366     b->data = f->data;
367     b->completion_handler = nxt_buf_filter_buf_completion;
368     b->parent = (nxt_buf_t *) ctx;
369     b->next = NULL;
370 
371     nxt_buf_chain_add(&f->current, b);
372 
373     nxt_buf_filter(thr, f, NULL);
374 
375     if (b->mem.pos == b->mem.free) {
376         /*
377          * The buffer has been completely processed by nxt_buf_filter(),
378          * its completion handler has been placed in workqueue and
379          * nxt_buf_filter_buf_completion() should be eventually called.
380          */
381         return;
382     }
383 
384     if (!done) {
385         /* Try to allocate another buffer and read the next file part. */
386         nxt_buf_filter_file_read(thr, f);
387     }
388 
389     return;
390 
391 fail:
392 
393     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
394                               f, f->data, thr->log);
395 }
396 
397 
398 static void
399 nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
400 {
401     nxt_buf_t             *fb, *b;
402     nxt_buf_filter_t      *f;
403     nxt_buf_filter_ctx_t  *ctx;
404 
405     b = obj;
406     ctx = data;
407     f = ctx->filter;
408 
409     nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O",
410                   b, f->filter_file->job_file.file.name,
411                   b->file_pos, b->file_end);
412 
413     /* nxt_http_send_filter() might clear a buffer's file status. */
414     b->is_file = 1;
415 
416     fb = ctx->buf;
417 
418     nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t));
419     nxt_buf_pool_free(&f->filter_file->buffers, b);
420 
421     if (fb->file_pos < fb->file_end) {
422         nxt_buf_filter_file_read(thr, f);
423         return;
424     }
425 
426     if (b->file_end == fb->file_end) {
427         nxt_buf_pool_destroy(&f->filter_file->buffers);
428 
429         nxt_job_destroy(&f->filter_file->job_file.job);
430 
431         nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler,
432                                   fb, fb->parent, thr->log);
433     }
434 
435     nxt_buf_filter(thr, f, NULL);
436 }
437 
438 
439 static void
440 nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data)
441 {
442     nxt_buf_filter_t  *f;
443 
444     f = data;
445 
446     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
447                               f, f->data, thr->log);
448 }
449