xref: /unit/src/nxt_buf_filter.c (revision 0)
1*0Sigor@sysoev.ru 
2*0Sigor@sysoev.ru /*
3*0Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*0Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*0Sigor@sysoev.ru  */
6*0Sigor@sysoev.ru 
7*0Sigor@sysoev.ru #include <nxt_main.h>
8*0Sigor@sysoev.ru 
9*0Sigor@sysoev.ru 
10*0Sigor@sysoev.ru static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11*0Sigor@sysoev.ru nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
12*0Sigor@sysoev.ru static void nxt_buf_filter_file_read_start(nxt_thread_t *thr,
13*0Sigor@sysoev.ru     nxt_buf_filter_t *f);
14*0Sigor@sysoev.ru static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f);
15*0Sigor@sysoev.ru static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr,
16*0Sigor@sysoev.ru     void *obj, void *data);
17*0Sigor@sysoev.ru static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj,
18*0Sigor@sysoev.ru     void *data);
19*0Sigor@sysoev.ru static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj,
20*0Sigor@sysoev.ru     void *data);
21*0Sigor@sysoev.ru 
22*0Sigor@sysoev.ru 
23*0Sigor@sysoev.ru void
24*0Sigor@sysoev.ru nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b)
25*0Sigor@sysoev.ru {
26*0Sigor@sysoev.ru     nxt_buf_chain_add(&f->input, b);
27*0Sigor@sysoev.ru 
28*0Sigor@sysoev.ru     nxt_buf_filter(thr, f, NULL);
29*0Sigor@sysoev.ru }
30*0Sigor@sysoev.ru 
31*0Sigor@sysoev.ru 
32*0Sigor@sysoev.ru void
33*0Sigor@sysoev.ru nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
34*0Sigor@sysoev.ru {
35*0Sigor@sysoev.ru     nxt_int_t         ret;
36*0Sigor@sysoev.ru     nxt_buf_t         *b;
37*0Sigor@sysoev.ru     nxt_buf_filter_t  *f;
38*0Sigor@sysoev.ru 
39*0Sigor@sysoev.ru     f = obj;
40*0Sigor@sysoev.ru 
41*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "buf filter");
42*0Sigor@sysoev.ru 
43*0Sigor@sysoev.ru     if (f->done) {
44*0Sigor@sysoev.ru         return;
45*0Sigor@sysoev.ru     }
46*0Sigor@sysoev.ru 
47*0Sigor@sysoev.ru     f->queued = 0;
48*0Sigor@sysoev.ru 
49*0Sigor@sysoev.ru     for ( ;; ) {
50*0Sigor@sysoev.ru         /*
51*0Sigor@sysoev.ru          * f->input is a chain of original incoming buffers: memory,
52*0Sigor@sysoev.ru          *     mapped, file, and sync buffers;
53*0Sigor@sysoev.ru          * f->current is a currently processed memory buffer or a chain
54*0Sigor@sysoev.ru          *     of memory/file or mapped/file buffers which are read of
55*0Sigor@sysoev.ru          *     or populated from file;
56*0Sigor@sysoev.ru          * f->output is a chain of output buffers;
57*0Sigor@sysoev.ru          * f->last is the last output buffer in the chain.
58*0Sigor@sysoev.ru          */
59*0Sigor@sysoev.ru 
60*0Sigor@sysoev.ru         b = f->current;
61*0Sigor@sysoev.ru 
62*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "buf filter current: %p", b);
63*0Sigor@sysoev.ru 
64*0Sigor@sysoev.ru         if (b == NULL) {
65*0Sigor@sysoev.ru 
66*0Sigor@sysoev.ru             if (f->reading) {
67*0Sigor@sysoev.ru                 return;
68*0Sigor@sysoev.ru             }
69*0Sigor@sysoev.ru 
70*0Sigor@sysoev.ru             b = f->input;
71*0Sigor@sysoev.ru 
72*0Sigor@sysoev.ru             nxt_log_debug(thr->log, "buf filter input: %p", b);
73*0Sigor@sysoev.ru 
74*0Sigor@sysoev.ru             if (b == NULL) {
75*0Sigor@sysoev.ru                 /*
76*0Sigor@sysoev.ru                  * The end of the input chain, pass
77*0Sigor@sysoev.ru                  * the output chain to the next filter.
78*0Sigor@sysoev.ru                  */
79*0Sigor@sysoev.ru                 nxt_buf_filter_next(f);
80*0Sigor@sysoev.ru 
81*0Sigor@sysoev.ru                 return;
82*0Sigor@sysoev.ru             }
83*0Sigor@sysoev.ru 
84*0Sigor@sysoev.ru             if (nxt_buf_is_mem(b)) {
85*0Sigor@sysoev.ru 
86*0Sigor@sysoev.ru                 f->current = b;
87*0Sigor@sysoev.ru                 f->input = b->next;
88*0Sigor@sysoev.ru                 b->next = NULL;
89*0Sigor@sysoev.ru 
90*0Sigor@sysoev.ru             } else if (nxt_buf_is_file(b)) {
91*0Sigor@sysoev.ru 
92*0Sigor@sysoev.ru                 if (f->run->filter_ready(f) != NXT_OK) {
93*0Sigor@sysoev.ru                     nxt_buf_filter_next(f);
94*0Sigor@sysoev.ru                 }
95*0Sigor@sysoev.ru 
96*0Sigor@sysoev.ru                 nxt_buf_filter_file_read_start(thr, f);
97*0Sigor@sysoev.ru                 return;
98*0Sigor@sysoev.ru             }
99*0Sigor@sysoev.ru         }
100*0Sigor@sysoev.ru 
101*0Sigor@sysoev.ru         if (nxt_buf_is_sync(b)) {
102*0Sigor@sysoev.ru 
103*0Sigor@sysoev.ru             ret = NXT_OK;
104*0Sigor@sysoev.ru             f->current = b;
105*0Sigor@sysoev.ru             f->input = b->next;
106*0Sigor@sysoev.ru             b->next = NULL;
107*0Sigor@sysoev.ru 
108*0Sigor@sysoev.ru             if (nxt_buf_is_nobuf(b)) {
109*0Sigor@sysoev.ru                 ret = f->run->filter_sync_nobuf(f);
110*0Sigor@sysoev.ru 
111*0Sigor@sysoev.ru             } else if (nxt_buf_is_flush(b)) {
112*0Sigor@sysoev.ru                 ret = f->run->filter_sync_flush(f);
113*0Sigor@sysoev.ru 
114*0Sigor@sysoev.ru             } else if (nxt_buf_is_last(b)) {
115*0Sigor@sysoev.ru                 ret = f->run->filter_sync_last(f);
116*0Sigor@sysoev.ru 
117*0Sigor@sysoev.ru                 f->done = (ret == NXT_OK);
118*0Sigor@sysoev.ru             }
119*0Sigor@sysoev.ru 
120*0Sigor@sysoev.ru             if (nxt_fast_path(ret == NXT_OK)) {
121*0Sigor@sysoev.ru                 continue;
122*0Sigor@sysoev.ru             }
123*0Sigor@sysoev.ru 
124*0Sigor@sysoev.ru             if (nxt_slow_path(ret == NXT_ERROR)) {
125*0Sigor@sysoev.ru                 goto fail;
126*0Sigor@sysoev.ru             }
127*0Sigor@sysoev.ru 
128*0Sigor@sysoev.ru             /* ret == NXT_AGAIN: No filter internal buffers available. */
129*0Sigor@sysoev.ru             goto nobuf;
130*0Sigor@sysoev.ru         }
131*0Sigor@sysoev.ru 
132*0Sigor@sysoev.ru         ret = f->run->filter_process(f);
133*0Sigor@sysoev.ru 
134*0Sigor@sysoev.ru         if (nxt_fast_path(ret == NXT_OK)) {
135*0Sigor@sysoev.ru             b = f->current;
136*0Sigor@sysoev.ru             /*
137*0Sigor@sysoev.ru              * A filter may just move f->current to f->output
138*0Sigor@sysoev.ru              * and then set f->current to NULL.
139*0Sigor@sysoev.ru              */
140*0Sigor@sysoev.ru             if (b != NULL && b->mem.pos == b->mem.free) {
141*0Sigor@sysoev.ru                 f->current = b->next;
142*0Sigor@sysoev.ru                 nxt_thread_work_queue_add(thr, f->work_queue,
143*0Sigor@sysoev.ru                                           b->completion_handler,
144*0Sigor@sysoev.ru                                           b, b->parent, thr->log);
145*0Sigor@sysoev.ru             }
146*0Sigor@sysoev.ru 
147*0Sigor@sysoev.ru             continue;
148*0Sigor@sysoev.ru         }
149*0Sigor@sysoev.ru 
150*0Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
151*0Sigor@sysoev.ru             goto fail;
152*0Sigor@sysoev.ru         }
153*0Sigor@sysoev.ru 
154*0Sigor@sysoev.ru         /* ret == NXT_AGAIN: No filter internal buffers available. */
155*0Sigor@sysoev.ru         goto nobuf;
156*0Sigor@sysoev.ru     }
157*0Sigor@sysoev.ru 
158*0Sigor@sysoev.ru nobuf:
159*0Sigor@sysoev.ru 
160*0Sigor@sysoev.ru     /* ret == NXT_AGAIN: No filter internal buffers available. */
161*0Sigor@sysoev.ru 
162*0Sigor@sysoev.ru     if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163*0Sigor@sysoev.ru         return;
164*0Sigor@sysoev.ru     }
165*0Sigor@sysoev.ru 
166*0Sigor@sysoev.ru fail:
167*0Sigor@sysoev.ru 
168*0Sigor@sysoev.ru     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
169*0Sigor@sysoev.ru                               f, f->data, thr->log);
170*0Sigor@sysoev.ru }
171*0Sigor@sysoev.ru 
172*0Sigor@sysoev.ru 
173*0Sigor@sysoev.ru static nxt_int_t
174*0Sigor@sysoev.ru nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175*0Sigor@sysoev.ru {
176*0Sigor@sysoev.ru     nxt_buf_t  *b;
177*0Sigor@sysoev.ru 
178*0Sigor@sysoev.ru     nxt_thread_log_debug("buf filter nobuf");
179*0Sigor@sysoev.ru 
180*0Sigor@sysoev.ru     b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
181*0Sigor@sysoev.ru 
182*0Sigor@sysoev.ru     if (nxt_fast_path(b != NULL)) {
183*0Sigor@sysoev.ru 
184*0Sigor@sysoev.ru         nxt_buf_chain_add(&f->output, b);
185*0Sigor@sysoev.ru         f->last = NULL;
186*0Sigor@sysoev.ru 
187*0Sigor@sysoev.ru         f->run->filter_next(f);
188*0Sigor@sysoev.ru 
189*0Sigor@sysoev.ru         f->output = NULL;
190*0Sigor@sysoev.ru 
191*0Sigor@sysoev.ru         return NXT_OK;
192*0Sigor@sysoev.ru     }
193*0Sigor@sysoev.ru 
194*0Sigor@sysoev.ru     return NXT_ERROR;
195*0Sigor@sysoev.ru }
196*0Sigor@sysoev.ru 
197*0Sigor@sysoev.ru 
198*0Sigor@sysoev.ru nxt_inline void
199*0Sigor@sysoev.ru nxt_buf_filter_next(nxt_buf_filter_t *f)
200*0Sigor@sysoev.ru {
201*0Sigor@sysoev.ru     if (f->output != NULL) {
202*0Sigor@sysoev.ru         f->last = NULL;
203*0Sigor@sysoev.ru 
204*0Sigor@sysoev.ru         f->run->filter_next(f);
205*0Sigor@sysoev.ru         f->output = NULL;
206*0Sigor@sysoev.ru     }
207*0Sigor@sysoev.ru }
208*0Sigor@sysoev.ru 
209*0Sigor@sysoev.ru 
210*0Sigor@sysoev.ru void
211*0Sigor@sysoev.ru nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f)
212*0Sigor@sysoev.ru {
213*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued);
214*0Sigor@sysoev.ru 
215*0Sigor@sysoev.ru     if (!f->queued && !f->done) {
216*0Sigor@sysoev.ru         f->queued = 1;
217*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter,
218*0Sigor@sysoev.ru                                   f, NULL, thr->log);
219*0Sigor@sysoev.ru     }
220*0Sigor@sysoev.ru }
221*0Sigor@sysoev.ru 
222*0Sigor@sysoev.ru 
223*0Sigor@sysoev.ru static void
224*0Sigor@sysoev.ru nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
225*0Sigor@sysoev.ru {
226*0Sigor@sysoev.ru     nxt_job_file_t         *jbf;
227*0Sigor@sysoev.ru     nxt_buf_filter_file_t  *ff;
228*0Sigor@sysoev.ru 
229*0Sigor@sysoev.ru     ff = f->run->job_file_create(f);
230*0Sigor@sysoev.ru 
231*0Sigor@sysoev.ru     if (nxt_slow_path(ff == NULL)) {
232*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
233*0Sigor@sysoev.ru                                   f, f->data, thr->log);
234*0Sigor@sysoev.ru         return;
235*0Sigor@sysoev.ru     }
236*0Sigor@sysoev.ru 
237*0Sigor@sysoev.ru     f->filter_file = ff;
238*0Sigor@sysoev.ru 
239*0Sigor@sysoev.ru     jbf = &ff->job_file;
240*0Sigor@sysoev.ru     jbf->file = *f->input->file;
241*0Sigor@sysoev.ru 
242*0Sigor@sysoev.ru     jbf->ready_handler = nxt_buf_filter_file_job_completion;
243*0Sigor@sysoev.ru     jbf->error_handler = nxt_buf_filter_file_read_error;
244*0Sigor@sysoev.ru 
245*0Sigor@sysoev.ru     nxt_job_set_name(&jbf->job, "buf filter job file");
246*0Sigor@sysoev.ru 
247*0Sigor@sysoev.ru     f->reading = 1;
248*0Sigor@sysoev.ru 
249*0Sigor@sysoev.ru     nxt_buf_filter_file_read(thr, f);
250*0Sigor@sysoev.ru }
251*0Sigor@sysoev.ru 
252*0Sigor@sysoev.ru 
253*0Sigor@sysoev.ru static void
254*0Sigor@sysoev.ru nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
255*0Sigor@sysoev.ru {
256*0Sigor@sysoev.ru     nxt_int_t              ret;
257*0Sigor@sysoev.ru     nxt_off_t              size;
258*0Sigor@sysoev.ru     nxt_buf_t              *b;
259*0Sigor@sysoev.ru     nxt_buf_filter_file_t  *ff;
260*0Sigor@sysoev.ru 
261*0Sigor@sysoev.ru     ff = f->filter_file;
262*0Sigor@sysoev.ru 
263*0Sigor@sysoev.ru     if (ff->job_file.buffer != NULL) {
264*0Sigor@sysoev.ru         /* File is now being read. */
265*0Sigor@sysoev.ru         return;
266*0Sigor@sysoev.ru     }
267*0Sigor@sysoev.ru 
268*0Sigor@sysoev.ru     size = f->input->file_end - f->input->file_pos;
269*0Sigor@sysoev.ru 
270*0Sigor@sysoev.ru     if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
271*0Sigor@sysoev.ru         /*
272*0Sigor@sysoev.ru          * Small size value is a hint for buffer pool allocation
273*0Sigor@sysoev.ru          * size, but if size of the size_t type is lesser than size
274*0Sigor@sysoev.ru          * of the nxt_off_t type, the large size value may be truncated,
275*0Sigor@sysoev.ru          * so use a default buffer pool allocation size.
276*0Sigor@sysoev.ru          */
277*0Sigor@sysoev.ru         size = 0;
278*0Sigor@sysoev.ru     }
279*0Sigor@sysoev.ru 
280*0Sigor@sysoev.ru     if (f->mmap) {
281*0Sigor@sysoev.ru         ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
282*0Sigor@sysoev.ru 
283*0Sigor@sysoev.ru     } else {
284*0Sigor@sysoev.ru         ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
285*0Sigor@sysoev.ru     }
286*0Sigor@sysoev.ru 
287*0Sigor@sysoev.ru     if (nxt_fast_path(ret == NXT_OK)) {
288*0Sigor@sysoev.ru         b = ff->buffers.current;
289*0Sigor@sysoev.ru 
290*0Sigor@sysoev.ru         b->file_pos = f->input->file_pos;
291*0Sigor@sysoev.ru         b->file_end = f->input->file_pos;
292*0Sigor@sysoev.ru         b->file = f->input->file;
293*0Sigor@sysoev.ru 
294*0Sigor@sysoev.ru         ff->job_file.buffer = b;
295*0Sigor@sysoev.ru         ff->job_file.offset = f->input->file_pos;
296*0Sigor@sysoev.ru 
297*0Sigor@sysoev.ru         f->run->job_file_retain(f);
298*0Sigor@sysoev.ru 
299*0Sigor@sysoev.ru         nxt_job_file_read(thr, &ff->job_file.job);
300*0Sigor@sysoev.ru         return;
301*0Sigor@sysoev.ru     }
302*0Sigor@sysoev.ru 
303*0Sigor@sysoev.ru     if (nxt_fast_path(ret != NXT_ERROR)) {
304*0Sigor@sysoev.ru 
305*0Sigor@sysoev.ru         /* ret == NXT_AGAIN: No buffers available. */
306*0Sigor@sysoev.ru 
307*0Sigor@sysoev.ru         if (f->buffering) {
308*0Sigor@sysoev.ru             f->buffering = 0;
309*0Sigor@sysoev.ru 
310*0Sigor@sysoev.ru             if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
311*0Sigor@sysoev.ru                 return;
312*0Sigor@sysoev.ru             }
313*0Sigor@sysoev.ru 
314*0Sigor@sysoev.ru         } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
315*0Sigor@sysoev.ru             return;
316*0Sigor@sysoev.ru         }
317*0Sigor@sysoev.ru     }
318*0Sigor@sysoev.ru 
319*0Sigor@sysoev.ru     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
320*0Sigor@sysoev.ru                               f, f->data, thr->log);
321*0Sigor@sysoev.ru }
322*0Sigor@sysoev.ru 
323*0Sigor@sysoev.ru 
324*0Sigor@sysoev.ru typedef struct {
325*0Sigor@sysoev.ru     nxt_buf_filter_t  *filter;
326*0Sigor@sysoev.ru     nxt_buf_t         *buf;
327*0Sigor@sysoev.ru } nxt_buf_filter_ctx_t;
328*0Sigor@sysoev.ru 
329*0Sigor@sysoev.ru 
330*0Sigor@sysoev.ru static void
331*0Sigor@sysoev.ru nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
332*0Sigor@sysoev.ru {
333*0Sigor@sysoev.ru     nxt_buf_t             *b;
334*0Sigor@sysoev.ru     nxt_bool_t            done;
335*0Sigor@sysoev.ru     nxt_job_file_t        *jbf;
336*0Sigor@sysoev.ru     nxt_buf_filter_t      *f;
337*0Sigor@sysoev.ru     nxt_buf_filter_ctx_t  *ctx;
338*0Sigor@sysoev.ru 
339*0Sigor@sysoev.ru     jbf = obj;
340*0Sigor@sysoev.ru     f = data;
341*0Sigor@sysoev.ru     b = jbf->buffer;
342*0Sigor@sysoev.ru     jbf->buffer = NULL;
343*0Sigor@sysoev.ru 
344*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O",
345*0Sigor@sysoev.ru                   jbf->file.name, b->file_pos, b->file_end);
346*0Sigor@sysoev.ru 
347*0Sigor@sysoev.ru     f->run->job_file_release(f);
348*0Sigor@sysoev.ru 
349*0Sigor@sysoev.ru     ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
350*0Sigor@sysoev.ru     if (nxt_slow_path(ctx == NULL)) {
351*0Sigor@sysoev.ru         goto fail;
352*0Sigor@sysoev.ru     }
353*0Sigor@sysoev.ru 
354*0Sigor@sysoev.ru     ctx->filter = f;
355*0Sigor@sysoev.ru     ctx->buf = f->input;
356*0Sigor@sysoev.ru 
357*0Sigor@sysoev.ru     f->input->file_pos = b->file_end;
358*0Sigor@sysoev.ru 
359*0Sigor@sysoev.ru     done = (f->input->file_pos == f->input->file_end);
360*0Sigor@sysoev.ru 
361*0Sigor@sysoev.ru     if (done) {
362*0Sigor@sysoev.ru         f->input = f->input->next;
363*0Sigor@sysoev.ru         f->reading = 0;
364*0Sigor@sysoev.ru     }
365*0Sigor@sysoev.ru 
366*0Sigor@sysoev.ru     b->data = f->data;
367*0Sigor@sysoev.ru     b->completion_handler = nxt_buf_filter_buf_completion;
368*0Sigor@sysoev.ru     b->parent = (nxt_buf_t *) ctx;
369*0Sigor@sysoev.ru     b->next = NULL;
370*0Sigor@sysoev.ru 
371*0Sigor@sysoev.ru     nxt_buf_chain_add(&f->current, b);
372*0Sigor@sysoev.ru 
373*0Sigor@sysoev.ru     nxt_buf_filter(thr, f, NULL);
374*0Sigor@sysoev.ru 
375*0Sigor@sysoev.ru     if (b->mem.pos == b->mem.free) {
376*0Sigor@sysoev.ru         /*
377*0Sigor@sysoev.ru          * The buffer has been completely processed by nxt_buf_filter(),
378*0Sigor@sysoev.ru          * its completion handler has been placed in workqueue and
379*0Sigor@sysoev.ru          * nxt_buf_filter_buf_completion() should be eventually called.
380*0Sigor@sysoev.ru          */
381*0Sigor@sysoev.ru         return;
382*0Sigor@sysoev.ru     }
383*0Sigor@sysoev.ru 
384*0Sigor@sysoev.ru     if (!done) {
385*0Sigor@sysoev.ru         /* Try to allocate another buffer and read the next file part. */
386*0Sigor@sysoev.ru         nxt_buf_filter_file_read(thr, f);
387*0Sigor@sysoev.ru     }
388*0Sigor@sysoev.ru 
389*0Sigor@sysoev.ru     return;
390*0Sigor@sysoev.ru 
391*0Sigor@sysoev.ru fail:
392*0Sigor@sysoev.ru 
393*0Sigor@sysoev.ru     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
394*0Sigor@sysoev.ru                               f, f->data, thr->log);
395*0Sigor@sysoev.ru }
396*0Sigor@sysoev.ru 
397*0Sigor@sysoev.ru 
398*0Sigor@sysoev.ru static void
399*0Sigor@sysoev.ru nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
400*0Sigor@sysoev.ru {
401*0Sigor@sysoev.ru     nxt_buf_t             *fb, *b;
402*0Sigor@sysoev.ru     nxt_buf_filter_t      *f;
403*0Sigor@sysoev.ru     nxt_buf_filter_ctx_t  *ctx;
404*0Sigor@sysoev.ru 
405*0Sigor@sysoev.ru     b = obj;
406*0Sigor@sysoev.ru     ctx = data;
407*0Sigor@sysoev.ru     f = ctx->filter;
408*0Sigor@sysoev.ru 
409*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O",
410*0Sigor@sysoev.ru                   b, f->filter_file->job_file.file.name,
411*0Sigor@sysoev.ru                   b->file_pos, b->file_end);
412*0Sigor@sysoev.ru 
413*0Sigor@sysoev.ru     /* nxt_http_send_filter() might clear a buffer's file status. */
414*0Sigor@sysoev.ru     b->is_file = 1;
415*0Sigor@sysoev.ru 
416*0Sigor@sysoev.ru     fb = ctx->buf;
417*0Sigor@sysoev.ru 
418*0Sigor@sysoev.ru     nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t));
419*0Sigor@sysoev.ru     nxt_buf_pool_free(&f->filter_file->buffers, b);
420*0Sigor@sysoev.ru 
421*0Sigor@sysoev.ru     if (fb->file_pos < fb->file_end) {
422*0Sigor@sysoev.ru         nxt_buf_filter_file_read(thr, f);
423*0Sigor@sysoev.ru         return;
424*0Sigor@sysoev.ru     }
425*0Sigor@sysoev.ru 
426*0Sigor@sysoev.ru     if (b->file_end == fb->file_end) {
427*0Sigor@sysoev.ru         nxt_buf_pool_destroy(&f->filter_file->buffers);
428*0Sigor@sysoev.ru 
429*0Sigor@sysoev.ru         nxt_job_destroy(&f->filter_file->job_file.job);
430*0Sigor@sysoev.ru 
431*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler,
432*0Sigor@sysoev.ru                                   fb, fb->parent, thr->log);
433*0Sigor@sysoev.ru     }
434*0Sigor@sysoev.ru 
435*0Sigor@sysoev.ru     nxt_buf_filter(thr, f, NULL);
436*0Sigor@sysoev.ru }
437*0Sigor@sysoev.ru 
438*0Sigor@sysoev.ru 
439*0Sigor@sysoev.ru static void
440*0Sigor@sysoev.ru nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data)
441*0Sigor@sysoev.ru {
442*0Sigor@sysoev.ru     nxt_buf_filter_t  *f;
443*0Sigor@sysoev.ru 
444*0Sigor@sysoev.ru     f = data;
445*0Sigor@sysoev.ru 
446*0Sigor@sysoev.ru     nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
447*0Sigor@sysoev.ru                               f, f->data, thr->log);
448*0Sigor@sysoev.ru }
449