xref: /unit/src/nxt_buf_filter.c (revision 1:fdc027c56872)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11 nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
12 static void nxt_buf_filter_file_read_start(nxt_task_t *task,
13     nxt_buf_filter_t *f);
14 static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
15 static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
20     void *data);
21 
22 
23 void
24 nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
25 {
26     nxt_buf_chain_add(&f->input, b);
27 
28     nxt_buf_filter(task, f, NULL);
29 }
30 
31 
32 void
33 nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
34 {
35     nxt_int_t         ret;
36     nxt_buf_t         *b;
37     nxt_buf_filter_t  *f;
38 
39     f = obj;
40 
41     nxt_debug(task, "buf filter");
42 
43     if (f->done) {
44         return;
45     }
46 
47     f->queued = 0;
48 
49     for ( ;; ) {
50         /*
51          * f->input is a chain of original incoming buffers: memory,
52          *     mapped, file, and sync buffers;
53          * f->current is a currently processed memory buffer or a chain
54          *     of memory/file or mapped/file buffers which are read of
55          *     or populated from file;
56          * f->output is a chain of output buffers;
57          * f->last is the last output buffer in the chain.
58          */
59 
60         b = f->current;
61 
62         nxt_debug(task, "buf filter current: %p", b);
63 
64         if (b == NULL) {
65 
66             if (f->reading) {
67                 return;
68             }
69 
70             b = f->input;
71 
72             nxt_debug(task, "buf filter input: %p", b);
73 
74             if (b == NULL) {
75                 /*
76                  * The end of the input chain, pass
77                  * the output chain to the next filter.
78                  */
79                 nxt_buf_filter_next(f);
80 
81                 return;
82             }
83 
84             if (nxt_buf_is_mem(b)) {
85 
86                 f->current = b;
87                 f->input = b->next;
88                 b->next = NULL;
89 
90             } else if (nxt_buf_is_file(b)) {
91 
92                 if (f->run->filter_ready(f) != NXT_OK) {
93                     nxt_buf_filter_next(f);
94                 }
95 
96                 nxt_buf_filter_file_read_start(task, f);
97                 return;
98             }
99         }
100 
101         if (nxt_buf_is_sync(b)) {
102 
103             ret = NXT_OK;
104             f->current = b;
105             f->input = b->next;
106             b->next = NULL;
107 
108             if (nxt_buf_is_nobuf(b)) {
109                 ret = f->run->filter_sync_nobuf(f);
110 
111             } else if (nxt_buf_is_flush(b)) {
112                 ret = f->run->filter_sync_flush(f);
113 
114             } else if (nxt_buf_is_last(b)) {
115                 ret = f->run->filter_sync_last(f);
116 
117                 f->done = (ret == NXT_OK);
118             }
119 
120             if (nxt_fast_path(ret == NXT_OK)) {
121                 continue;
122             }
123 
124             if (nxt_slow_path(ret == NXT_ERROR)) {
125                 goto fail;
126             }
127 
128             /* ret == NXT_AGAIN: No filter internal buffers available. */
129             goto nobuf;
130         }
131 
132         ret = f->run->filter_process(f);
133 
134         if (nxt_fast_path(ret == NXT_OK)) {
135             b = f->current;
136             /*
137              * A filter may just move f->current to f->output
138              * and then set f->current to NULL.
139              */
140             if (b != NULL && b->mem.pos == b->mem.free) {
141                 f->current = b->next;
142                 nxt_thread_work_queue_add(task->thread, f->work_queue,
143                                           b->completion_handler,
144                                           task, b, b->parent);
145             }
146 
147             continue;
148         }
149 
150         if (nxt_slow_path(ret == NXT_ERROR)) {
151             goto fail;
152         }
153 
154         /* ret == NXT_AGAIN: No filter internal buffers available. */
155         goto nobuf;
156     }
157 
158 nobuf:
159 
160     /* ret == NXT_AGAIN: No filter internal buffers available. */
161 
162     if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163         return;
164     }
165 
166 fail:
167 
168     nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
169                               task, f, f->data);
170 }
171 
172 
173 static nxt_int_t
174 nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175 {
176     nxt_buf_t  *b;
177 
178     nxt_thread_log_debug("buf filter nobuf");
179 
180     b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
181 
182     if (nxt_fast_path(b != NULL)) {
183 
184         nxt_buf_chain_add(&f->output, b);
185         f->last = NULL;
186 
187         f->run->filter_next(f);
188 
189         f->output = NULL;
190 
191         return NXT_OK;
192     }
193 
194     return NXT_ERROR;
195 }
196 
197 
198 nxt_inline void
199 nxt_buf_filter_next(nxt_buf_filter_t *f)
200 {
201     if (f->output != NULL) {
202         f->last = NULL;
203 
204         f->run->filter_next(f);
205         f->output = NULL;
206     }
207 }
208 
209 
210 void
211 nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
212 {
213     nxt_debug(task, "buf filter enqueue: %d", f->queued);
214 
215     if (!f->queued && !f->done) {
216         f->queued = 1;
217         nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
218                                   task, f, NULL);
219     }
220 }
221 
222 
223 static void
224 nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
225 {
226     nxt_job_file_t         *jbf;
227     nxt_buf_filter_file_t  *ff;
228 
229     ff = f->run->job_file_create(f);
230 
231     if (nxt_slow_path(ff == NULL)) {
232         nxt_thread_work_queue_add(task->thread, f->work_queue,
233                                   f->run->filter_error,
234                                   task, f, f->data);
235         return;
236     }
237 
238     f->filter_file = ff;
239 
240     jbf = &ff->job_file;
241     jbf->file = *f->input->file;
242 
243     jbf->ready_handler = nxt_buf_filter_file_job_completion;
244     jbf->error_handler = nxt_buf_filter_file_read_error;
245 
246     nxt_job_set_name(&jbf->job, "buf filter job file");
247 
248     f->reading = 1;
249 
250     nxt_buf_filter_file_read(task, f);
251 }
252 
253 
254 static void
255 nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
256 {
257     nxt_int_t              ret;
258     nxt_off_t              size;
259     nxt_buf_t              *b;
260     nxt_buf_filter_file_t  *ff;
261 
262     ff = f->filter_file;
263 
264     if (ff->job_file.buffer != NULL) {
265         /* File is now being read. */
266         return;
267     }
268 
269     size = f->input->file_end - f->input->file_pos;
270 
271     if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
272         /*
273          * Small size value is a hint for buffer pool allocation
274          * size, but if size of the size_t type is lesser than size
275          * of the nxt_off_t type, the large size value may be truncated,
276          * so use a default buffer pool allocation size.
277          */
278         size = 0;
279     }
280 
281     if (f->mmap) {
282         ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
283 
284     } else {
285         ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
286     }
287 
288     if (nxt_fast_path(ret == NXT_OK)) {
289         b = ff->buffers.current;
290 
291         b->file_pos = f->input->file_pos;
292         b->file_end = f->input->file_pos;
293         b->file = f->input->file;
294 
295         ff->job_file.buffer = b;
296         ff->job_file.offset = f->input->file_pos;
297 
298         f->run->job_file_retain(f);
299 
300         nxt_job_file_read(task, &ff->job_file.job);
301         return;
302     }
303 
304     if (nxt_fast_path(ret != NXT_ERROR)) {
305 
306         /* ret == NXT_AGAIN: No buffers available. */
307 
308         if (f->buffering) {
309             f->buffering = 0;
310 
311             if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
312                 return;
313             }
314 
315         } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
316             return;
317         }
318     }
319 
320     nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
321                               task, f, f->data);
322 }
323 
324 
325 typedef struct {
326     nxt_buf_filter_t  *filter;
327     nxt_buf_t         *buf;
328 } nxt_buf_filter_ctx_t;
329 
330 
331 static void
332 nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
333 {
334     nxt_buf_t             *b;
335     nxt_bool_t            done;
336     nxt_job_file_t        *jbf;
337     nxt_buf_filter_t      *f;
338     nxt_buf_filter_ctx_t  *ctx;
339 
340     jbf = obj;
341     f = data;
342     b = jbf->buffer;
343     jbf->buffer = NULL;
344 
345     nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
346               jbf->file.name, b->file_pos, b->file_end);
347 
348     f->run->job_file_release(f);
349 
350     ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
351     if (nxt_slow_path(ctx == NULL)) {
352         goto fail;
353     }
354 
355     ctx->filter = f;
356     ctx->buf = f->input;
357 
358     f->input->file_pos = b->file_end;
359 
360     done = (f->input->file_pos == f->input->file_end);
361 
362     if (done) {
363         f->input = f->input->next;
364         f->reading = 0;
365     }
366 
367     b->data = f->data;
368     b->completion_handler = nxt_buf_filter_buf_completion;
369     b->parent = (nxt_buf_t *) ctx;
370     b->next = NULL;
371 
372     nxt_buf_chain_add(&f->current, b);
373 
374     nxt_buf_filter(task, f, NULL);
375 
376     if (b->mem.pos == b->mem.free) {
377         /*
378          * The buffer has been completely processed by nxt_buf_filter(),
379          * its completion handler has been placed in workqueue and
380          * nxt_buf_filter_buf_completion() should be eventually called.
381          */
382         return;
383     }
384 
385     if (!done) {
386         /* Try to allocate another buffer and read the next file part. */
387         nxt_buf_filter_file_read(task, f);
388     }
389 
390     return;
391 
392 fail:
393 
394     nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
395                               task, f, f->data);
396 }
397 
398 
399 static void
400 nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
401 {
402     nxt_buf_t             *fb, *b;
403     nxt_buf_filter_t      *f;
404     nxt_buf_filter_ctx_t  *ctx;
405 
406     b = obj;
407     ctx = data;
408     f = ctx->filter;
409 
410     nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
411               b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
412 
413     /* nxt_http_send_filter() might clear a buffer's file status. */
414     b->is_file = 1;
415 
416     fb = ctx->buf;
417 
418     nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t));
419     nxt_buf_pool_free(&f->filter_file->buffers, b);
420 
421     if (fb->file_pos < fb->file_end) {
422         nxt_buf_filter_file_read(task, f);
423         return;
424     }
425 
426     if (b->file_end == fb->file_end) {
427         nxt_buf_pool_destroy(&f->filter_file->buffers);
428 
429         nxt_job_destroy(&f->filter_file->job_file.job);
430 
431         nxt_thread_work_queue_add(task->thread, f->work_queue,
432                                   fb->completion_handler,
433                                   task, fb, fb->parent);
434     }
435 
436     nxt_buf_filter(task, f, NULL);
437 }
438 
439 
440 static void
441 nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
442 {
443     nxt_buf_filter_t  *f;
444 
445     f = data;
446 
447     nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
448                               task, f, f->data);
449 }
450