nxt_buf_filter.c (0:a63ceefd6ab0) nxt_buf_filter.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
12static void nxt_buf_filter_file_read_start(nxt_thread_t *thr,
12static void nxt_buf_filter_file_read_start(nxt_task_t *task,
13 nxt_buf_filter_t *f);
13 nxt_buf_filter_t *f);
14static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f);
15static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr,
16 void *obj, void *data);
17static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj,
14static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
15static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
18 void *data);
16 void *data);
19static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj,
17static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
20 void *data);
18 void *data);
19static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
20 void *data);
21
22
23void
21
22
23void
24nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b)
24nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
25{
26 nxt_buf_chain_add(&f->input, b);
27
25{
26 nxt_buf_chain_add(&f->input, b);
27
28 nxt_buf_filter(thr, f, NULL);
28 nxt_buf_filter(task, f, NULL);
29}
30
31
32void
29}
30
31
32void
33nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
33nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
34{
35 nxt_int_t ret;
36 nxt_buf_t *b;
37 nxt_buf_filter_t *f;
38
39 f = obj;
40
34{
35 nxt_int_t ret;
36 nxt_buf_t *b;
37 nxt_buf_filter_t *f;
38
39 f = obj;
40
41 nxt_log_debug(thr->log, "buf filter");
41 nxt_debug(task, "buf filter");
42
43 if (f->done) {
44 return;
45 }
46
47 f->queued = 0;
48
49 for ( ;; ) {

--- 4 unchanged lines hidden (view full) ---

54 * of memory/file or mapped/file buffers which are read of
55 * or populated from file;
56 * f->output is a chain of output buffers;
57 * f->last is the last output buffer in the chain.
58 */
59
60 b = f->current;
61
42
43 if (f->done) {
44 return;
45 }
46
47 f->queued = 0;
48
49 for ( ;; ) {

--- 4 unchanged lines hidden (view full) ---

54 * of memory/file or mapped/file buffers which are read of
55 * or populated from file;
56 * f->output is a chain of output buffers;
57 * f->last is the last output buffer in the chain.
58 */
59
60 b = f->current;
61
62 nxt_log_debug(thr->log, "buf filter current: %p", b);
62 nxt_debug(task, "buf filter current: %p", b);
63
64 if (b == NULL) {
65
66 if (f->reading) {
67 return;
68 }
69
70 b = f->input;
71
63
64 if (b == NULL) {
65
66 if (f->reading) {
67 return;
68 }
69
70 b = f->input;
71
72 nxt_log_debug(thr->log, "buf filter input: %p", b);
72 nxt_debug(task, "buf filter input: %p", b);
73
74 if (b == NULL) {
75 /*
76 * The end of the input chain, pass
77 * the output chain to the next filter.
78 */
79 nxt_buf_filter_next(f);
80

--- 7 unchanged lines hidden (view full) ---

88 b->next = NULL;
89
90 } else if (nxt_buf_is_file(b)) {
91
92 if (f->run->filter_ready(f) != NXT_OK) {
93 nxt_buf_filter_next(f);
94 }
95
73
74 if (b == NULL) {
75 /*
76 * The end of the input chain, pass
77 * the output chain to the next filter.
78 */
79 nxt_buf_filter_next(f);
80

--- 7 unchanged lines hidden (view full) ---

88 b->next = NULL;
89
90 } else if (nxt_buf_is_file(b)) {
91
92 if (f->run->filter_ready(f) != NXT_OK) {
93 nxt_buf_filter_next(f);
94 }
95
96 nxt_buf_filter_file_read_start(thr, f);
96 nxt_buf_filter_file_read_start(task, f);
97 return;
98 }
99 }
100
101 if (nxt_buf_is_sync(b)) {
102
103 ret = NXT_OK;
104 f->current = b;

--- 29 unchanged lines hidden (view full) ---

134 if (nxt_fast_path(ret == NXT_OK)) {
135 b = f->current;
136 /*
137 * A filter may just move f->current to f->output
138 * and then set f->current to NULL.
139 */
140 if (b != NULL && b->mem.pos == b->mem.free) {
141 f->current = b->next;
97 return;
98 }
99 }
100
101 if (nxt_buf_is_sync(b)) {
102
103 ret = NXT_OK;
104 f->current = b;

--- 29 unchanged lines hidden (view full) ---

134 if (nxt_fast_path(ret == NXT_OK)) {
135 b = f->current;
136 /*
137 * A filter may just move f->current to f->output
138 * and then set f->current to NULL.
139 */
140 if (b != NULL && b->mem.pos == b->mem.free) {
141 f->current = b->next;
142 nxt_thread_work_queue_add(thr, f->work_queue,
142 nxt_thread_work_queue_add(task->thread, f->work_queue,
143 b->completion_handler,
143 b->completion_handler,
144 b, b->parent, thr->log);
144 task, b, b->parent);
145 }
146
147 continue;
148 }
149
150 if (nxt_slow_path(ret == NXT_ERROR)) {
151 goto fail;
152 }

--- 7 unchanged lines hidden (view full) ---

160 /* ret == NXT_AGAIN: No filter internal buffers available. */
161
162 if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163 return;
164 }
165
166fail:
167
145 }
146
147 continue;
148 }
149
150 if (nxt_slow_path(ret == NXT_ERROR)) {
151 goto fail;
152 }

--- 7 unchanged lines hidden (view full) ---

160 /* ret == NXT_AGAIN: No filter internal buffers available. */
161
162 if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163 return;
164 }
165
166fail:
167
168 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
169 f, f->data, thr->log);
168 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
169 task, f, f->data);
170}
171
172
173static nxt_int_t
174nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175{
176 nxt_buf_t *b;
177

--- 25 unchanged lines hidden (view full) ---

203
204 f->run->filter_next(f);
205 f->output = NULL;
206 }
207}
208
209
210void
170}
171
172
173static nxt_int_t
174nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175{
176 nxt_buf_t *b;
177

--- 25 unchanged lines hidden (view full) ---

203
204 f->run->filter_next(f);
205 f->output = NULL;
206 }
207}
208
209
210void
211nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f)
211nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
212{
212{
213 nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued);
213 nxt_debug(task, "buf filter enqueue: %d", f->queued);
214
215 if (!f->queued && !f->done) {
216 f->queued = 1;
214
215 if (!f->queued && !f->done) {
216 f->queued = 1;
217 nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter,
218 f, NULL, thr->log);
217 nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
218 task, f, NULL);
219 }
220}
221
222
223static void
219 }
220}
221
222
223static void
224nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
224nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
225{
226 nxt_job_file_t *jbf;
227 nxt_buf_filter_file_t *ff;
228
229 ff = f->run->job_file_create(f);
230
231 if (nxt_slow_path(ff == NULL)) {
225{
226 nxt_job_file_t *jbf;
227 nxt_buf_filter_file_t *ff;
228
229 ff = f->run->job_file_create(f);
230
231 if (nxt_slow_path(ff == NULL)) {
232 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
233 f, f->data, thr->log);
232 nxt_thread_work_queue_add(task->thread, f->work_queue,
233 f->run->filter_error,
234 task, f, f->data);
234 return;
235 }
236
237 f->filter_file = ff;
238
239 jbf = &ff->job_file;
240 jbf->file = *f->input->file;
241
242 jbf->ready_handler = nxt_buf_filter_file_job_completion;
243 jbf->error_handler = nxt_buf_filter_file_read_error;
244
245 nxt_job_set_name(&jbf->job, "buf filter job file");
246
247 f->reading = 1;
248
235 return;
236 }
237
238 f->filter_file = ff;
239
240 jbf = &ff->job_file;
241 jbf->file = *f->input->file;
242
243 jbf->ready_handler = nxt_buf_filter_file_job_completion;
244 jbf->error_handler = nxt_buf_filter_file_read_error;
245
246 nxt_job_set_name(&jbf->job, "buf filter job file");
247
248 f->reading = 1;
249
249 nxt_buf_filter_file_read(thr, f);
250 nxt_buf_filter_file_read(task, f);
250}
251
252
253static void
251}
252
253
254static void
254nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
255nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
255{
256 nxt_int_t ret;
257 nxt_off_t size;
258 nxt_buf_t *b;
259 nxt_buf_filter_file_t *ff;
260
261 ff = f->filter_file;
262

--- 28 unchanged lines hidden (view full) ---

291 b->file_end = f->input->file_pos;
292 b->file = f->input->file;
293
294 ff->job_file.buffer = b;
295 ff->job_file.offset = f->input->file_pos;
296
297 f->run->job_file_retain(f);
298
256{
257 nxt_int_t ret;
258 nxt_off_t size;
259 nxt_buf_t *b;
260 nxt_buf_filter_file_t *ff;
261
262 ff = f->filter_file;
263

--- 28 unchanged lines hidden (view full) ---

292 b->file_end = f->input->file_pos;
293 b->file = f->input->file;
294
295 ff->job_file.buffer = b;
296 ff->job_file.offset = f->input->file_pos;
297
298 f->run->job_file_retain(f);
299
299 nxt_job_file_read(thr, &ff->job_file.job);
300 nxt_job_file_read(task, &ff->job_file.job);
300 return;
301 }
302
303 if (nxt_fast_path(ret != NXT_ERROR)) {
304
305 /* ret == NXT_AGAIN: No buffers available. */
306
307 if (f->buffering) {
308 f->buffering = 0;
309
310 if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
311 return;
312 }
313
314 } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
315 return;
316 }
317 }
318
301 return;
302 }
303
304 if (nxt_fast_path(ret != NXT_ERROR)) {
305
306 /* ret == NXT_AGAIN: No buffers available. */
307
308 if (f->buffering) {
309 f->buffering = 0;
310
311 if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
312 return;
313 }
314
315 } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
316 return;
317 }
318 }
319
319 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
320 f, f->data, thr->log);
320 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
321 task, f, f->data);
321}
322
323
324typedef struct {
325 nxt_buf_filter_t *filter;
326 nxt_buf_t *buf;
327} nxt_buf_filter_ctx_t;
328
329
330static void
322}
323
324
325typedef struct {
326 nxt_buf_filter_t *filter;
327 nxt_buf_t *buf;
328} nxt_buf_filter_ctx_t;
329
330
331static void
331nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
332nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
332{
333 nxt_buf_t *b;
334 nxt_bool_t done;
335 nxt_job_file_t *jbf;
336 nxt_buf_filter_t *f;
337 nxt_buf_filter_ctx_t *ctx;
338
339 jbf = obj;
340 f = data;
341 b = jbf->buffer;
342 jbf->buffer = NULL;
343
333{
334 nxt_buf_t *b;
335 nxt_bool_t done;
336 nxt_job_file_t *jbf;
337 nxt_buf_filter_t *f;
338 nxt_buf_filter_ctx_t *ctx;
339
340 jbf = obj;
341 f = data;
342 b = jbf->buffer;
343 jbf->buffer = NULL;
344
344 nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O",
345 jbf->file.name, b->file_pos, b->file_end);
345 nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
346 jbf->file.name, b->file_pos, b->file_end);
346
347 f->run->job_file_release(f);
348
349 ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
350 if (nxt_slow_path(ctx == NULL)) {
351 goto fail;
352 }
353

--- 11 unchanged lines hidden (view full) ---

365
366 b->data = f->data;
367 b->completion_handler = nxt_buf_filter_buf_completion;
368 b->parent = (nxt_buf_t *) ctx;
369 b->next = NULL;
370
371 nxt_buf_chain_add(&f->current, b);
372
347
348 f->run->job_file_release(f);
349
350 ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
351 if (nxt_slow_path(ctx == NULL)) {
352 goto fail;
353 }
354

--- 11 unchanged lines hidden (view full) ---

366
367 b->data = f->data;
368 b->completion_handler = nxt_buf_filter_buf_completion;
369 b->parent = (nxt_buf_t *) ctx;
370 b->next = NULL;
371
372 nxt_buf_chain_add(&f->current, b);
373
373 nxt_buf_filter(thr, f, NULL);
374 nxt_buf_filter(task, f, NULL);
374
375 if (b->mem.pos == b->mem.free) {
376 /*
377 * The buffer has been completely processed by nxt_buf_filter(),
378 * its completion handler has been placed in workqueue and
379 * nxt_buf_filter_buf_completion() should be eventually called.
380 */
381 return;
382 }
383
384 if (!done) {
385 /* Try to allocate another buffer and read the next file part. */
375
376 if (b->mem.pos == b->mem.free) {
377 /*
378 * The buffer has been completely processed by nxt_buf_filter(),
379 * its completion handler has been placed in workqueue and
380 * nxt_buf_filter_buf_completion() should be eventually called.
381 */
382 return;
383 }
384
385 if (!done) {
386 /* Try to allocate another buffer and read the next file part. */
386 nxt_buf_filter_file_read(thr, f);
387 nxt_buf_filter_file_read(task, f);
387 }
388
389 return;
390
391fail:
392
388 }
389
390 return;
391
392fail:
393
393 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
394 f, f->data, thr->log);
394 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
395 task, f, f->data);
395}
396
397
398static void
396}
397
398
399static void
399nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
400nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
400{
401 nxt_buf_t *fb, *b;
402 nxt_buf_filter_t *f;
403 nxt_buf_filter_ctx_t *ctx;
404
405 b = obj;
406 ctx = data;
407 f = ctx->filter;
408
401{
402 nxt_buf_t *fb, *b;
403 nxt_buf_filter_t *f;
404 nxt_buf_filter_ctx_t *ctx;
405
406 b = obj;
407 ctx = data;
408 f = ctx->filter;
409
409 nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O",
410 b, f->filter_file->job_file.file.name,
411 b->file_pos, b->file_end);
410 nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
411 b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
412
413 /* nxt_http_send_filter() might clear a buffer's file status. */
414 b->is_file = 1;
415
416 fb = ctx->buf;
417
418 nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t));
419 nxt_buf_pool_free(&f->filter_file->buffers, b);
420
421 if (fb->file_pos < fb->file_end) {
412
413 /* nxt_http_send_filter() might clear a buffer's file status. */
414 b->is_file = 1;
415
416 fb = ctx->buf;
417
418 nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t));
419 nxt_buf_pool_free(&f->filter_file->buffers, b);
420
421 if (fb->file_pos < fb->file_end) {
422 nxt_buf_filter_file_read(thr, f);
422 nxt_buf_filter_file_read(task, f);
423 return;
424 }
425
426 if (b->file_end == fb->file_end) {
427 nxt_buf_pool_destroy(&f->filter_file->buffers);
428
429 nxt_job_destroy(&f->filter_file->job_file.job);
430
423 return;
424 }
425
426 if (b->file_end == fb->file_end) {
427 nxt_buf_pool_destroy(&f->filter_file->buffers);
428
429 nxt_job_destroy(&f->filter_file->job_file.job);
430
431 nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler,
432 fb, fb->parent, thr->log);
431 nxt_thread_work_queue_add(task->thread, f->work_queue,
432 fb->completion_handler,
433 task, fb, fb->parent);
433 }
434
434 }
435
435 nxt_buf_filter(thr, f, NULL);
436 nxt_buf_filter(task, f, NULL);
436}
437
438
439static void
437}
438
439
440static void
440nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data)
441nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
441{
442 nxt_buf_filter_t *f;
443
444 f = data;
445
442{
443 nxt_buf_filter_t *f;
444
445 f = data;
446
446 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
447 f, f->data, thr->log);
447 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
448 task, f, f->data);
448}
449}