10Sigor@sysoev.ru
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru */
60Sigor@sysoev.ru
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru
90Sigor@sysoev.ru
100Sigor@sysoev.ru static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
110Sigor@sysoev.ru nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
121Sigor@sysoev.ru static void nxt_buf_filter_file_read_start(nxt_task_t *task,
130Sigor@sysoev.ru nxt_buf_filter_t *f);
141Sigor@sysoev.ru static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
151Sigor@sysoev.ru static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
160Sigor@sysoev.ru void *data);
171Sigor@sysoev.ru static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
181Sigor@sysoev.ru void *data);
191Sigor@sysoev.ru static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
200Sigor@sysoev.ru void *data);
210Sigor@sysoev.ru
220Sigor@sysoev.ru
230Sigor@sysoev.ru void
nxt_buf_filter_add(nxt_task_t * task,nxt_buf_filter_t * f,nxt_buf_t * b)241Sigor@sysoev.ru nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
250Sigor@sysoev.ru {
260Sigor@sysoev.ru nxt_buf_chain_add(&f->input, b);
270Sigor@sysoev.ru
281Sigor@sysoev.ru nxt_buf_filter(task, f, NULL);
290Sigor@sysoev.ru }
300Sigor@sysoev.ru
310Sigor@sysoev.ru
320Sigor@sysoev.ru void
nxt_buf_filter(nxt_task_t * task,void * obj,void * data)331Sigor@sysoev.ru nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
340Sigor@sysoev.ru {
350Sigor@sysoev.ru nxt_int_t ret;
360Sigor@sysoev.ru nxt_buf_t *b;
370Sigor@sysoev.ru nxt_buf_filter_t *f;
380Sigor@sysoev.ru
390Sigor@sysoev.ru f = obj;
400Sigor@sysoev.ru
411Sigor@sysoev.ru nxt_debug(task, "buf filter");
420Sigor@sysoev.ru
430Sigor@sysoev.ru if (f->done) {
440Sigor@sysoev.ru return;
450Sigor@sysoev.ru }
460Sigor@sysoev.ru
470Sigor@sysoev.ru f->queued = 0;
480Sigor@sysoev.ru
490Sigor@sysoev.ru for ( ;; ) {
500Sigor@sysoev.ru /*
510Sigor@sysoev.ru * f->input is a chain of original incoming buffers: memory,
520Sigor@sysoev.ru * mapped, file, and sync buffers;
530Sigor@sysoev.ru * f->current is a currently processed memory buffer or a chain
540Sigor@sysoev.ru * of memory/file or mapped/file buffers which are read of
550Sigor@sysoev.ru * or populated from file;
560Sigor@sysoev.ru * f->output is a chain of output buffers;
570Sigor@sysoev.ru * f->last is the last output buffer in the chain.
580Sigor@sysoev.ru */
590Sigor@sysoev.ru
600Sigor@sysoev.ru b = f->current;
610Sigor@sysoev.ru
621Sigor@sysoev.ru nxt_debug(task, "buf filter current: %p", b);
630Sigor@sysoev.ru
640Sigor@sysoev.ru if (b == NULL) {
650Sigor@sysoev.ru
660Sigor@sysoev.ru if (f->reading) {
670Sigor@sysoev.ru return;
680Sigor@sysoev.ru }
690Sigor@sysoev.ru
700Sigor@sysoev.ru b = f->input;
710Sigor@sysoev.ru
721Sigor@sysoev.ru nxt_debug(task, "buf filter input: %p", b);
730Sigor@sysoev.ru
740Sigor@sysoev.ru if (b == NULL) {
750Sigor@sysoev.ru /*
760Sigor@sysoev.ru * The end of the input chain, pass
770Sigor@sysoev.ru * the output chain to the next filter.
780Sigor@sysoev.ru */
790Sigor@sysoev.ru nxt_buf_filter_next(f);
800Sigor@sysoev.ru
810Sigor@sysoev.ru return;
820Sigor@sysoev.ru }
830Sigor@sysoev.ru
840Sigor@sysoev.ru if (nxt_buf_is_mem(b)) {
850Sigor@sysoev.ru
860Sigor@sysoev.ru f->current = b;
870Sigor@sysoev.ru f->input = b->next;
880Sigor@sysoev.ru b->next = NULL;
890Sigor@sysoev.ru
900Sigor@sysoev.ru } else if (nxt_buf_is_file(b)) {
910Sigor@sysoev.ru
920Sigor@sysoev.ru if (f->run->filter_ready(f) != NXT_OK) {
930Sigor@sysoev.ru nxt_buf_filter_next(f);
940Sigor@sysoev.ru }
950Sigor@sysoev.ru
961Sigor@sysoev.ru nxt_buf_filter_file_read_start(task, f);
970Sigor@sysoev.ru return;
980Sigor@sysoev.ru }
990Sigor@sysoev.ru }
1000Sigor@sysoev.ru
1010Sigor@sysoev.ru if (nxt_buf_is_sync(b)) {
1020Sigor@sysoev.ru
1030Sigor@sysoev.ru ret = NXT_OK;
1040Sigor@sysoev.ru f->current = b;
1050Sigor@sysoev.ru f->input = b->next;
1060Sigor@sysoev.ru b->next = NULL;
1070Sigor@sysoev.ru
1080Sigor@sysoev.ru if (nxt_buf_is_nobuf(b)) {
1090Sigor@sysoev.ru ret = f->run->filter_sync_nobuf(f);
1100Sigor@sysoev.ru
1110Sigor@sysoev.ru } else if (nxt_buf_is_flush(b)) {
1120Sigor@sysoev.ru ret = f->run->filter_sync_flush(f);
1130Sigor@sysoev.ru
1140Sigor@sysoev.ru } else if (nxt_buf_is_last(b)) {
1150Sigor@sysoev.ru ret = f->run->filter_sync_last(f);
1160Sigor@sysoev.ru
1170Sigor@sysoev.ru f->done = (ret == NXT_OK);
1180Sigor@sysoev.ru }
1190Sigor@sysoev.ru
1200Sigor@sysoev.ru if (nxt_fast_path(ret == NXT_OK)) {
1210Sigor@sysoev.ru continue;
1220Sigor@sysoev.ru }
1230Sigor@sysoev.ru
1240Sigor@sysoev.ru if (nxt_slow_path(ret == NXT_ERROR)) {
1250Sigor@sysoev.ru goto fail;
1260Sigor@sysoev.ru }
1270Sigor@sysoev.ru
1280Sigor@sysoev.ru /* ret == NXT_AGAIN: No filter internal buffers available. */
1290Sigor@sysoev.ru goto nobuf;
1300Sigor@sysoev.ru }
1310Sigor@sysoev.ru
1320Sigor@sysoev.ru ret = f->run->filter_process(f);
1330Sigor@sysoev.ru
1340Sigor@sysoev.ru if (nxt_fast_path(ret == NXT_OK)) {
1350Sigor@sysoev.ru b = f->current;
1360Sigor@sysoev.ru /*
1370Sigor@sysoev.ru * A filter may just move f->current to f->output
1380Sigor@sysoev.ru * and then set f->current to NULL.
1390Sigor@sysoev.ru */
1400Sigor@sysoev.ru if (b != NULL && b->mem.pos == b->mem.free) {
1410Sigor@sysoev.ru f->current = b->next;
1421Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue,
1430Sigor@sysoev.ru b->completion_handler,
1441Sigor@sysoev.ru task, b, b->parent);
1450Sigor@sysoev.ru }
1460Sigor@sysoev.ru
1470Sigor@sysoev.ru continue;
1480Sigor@sysoev.ru }
1490Sigor@sysoev.ru
1500Sigor@sysoev.ru if (nxt_slow_path(ret == NXT_ERROR)) {
1510Sigor@sysoev.ru goto fail;
1520Sigor@sysoev.ru }
1530Sigor@sysoev.ru
1540Sigor@sysoev.ru /* ret == NXT_AGAIN: No filter internal buffers available. */
1550Sigor@sysoev.ru goto nobuf;
1560Sigor@sysoev.ru }
1570Sigor@sysoev.ru
1580Sigor@sysoev.ru nobuf:
1590Sigor@sysoev.ru
1600Sigor@sysoev.ru /* ret == NXT_AGAIN: No filter internal buffers available. */
1610Sigor@sysoev.ru
1620Sigor@sysoev.ru if (nxt_buf_filter_nobuf(f) == NXT_OK) {
1630Sigor@sysoev.ru return;
1640Sigor@sysoev.ru }
1650Sigor@sysoev.ru
1660Sigor@sysoev.ru fail:
1670Sigor@sysoev.ru
1681Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
1691Sigor@sysoev.ru task, f, f->data);
1700Sigor@sysoev.ru }
1710Sigor@sysoev.ru
1720Sigor@sysoev.ru
1730Sigor@sysoev.ru static nxt_int_t
nxt_buf_filter_nobuf(nxt_buf_filter_t * f)1740Sigor@sysoev.ru nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
1750Sigor@sysoev.ru {
1760Sigor@sysoev.ru nxt_buf_t *b;
1770Sigor@sysoev.ru
1780Sigor@sysoev.ru nxt_thread_log_debug("buf filter nobuf");
1790Sigor@sysoev.ru
1800Sigor@sysoev.ru b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
1810Sigor@sysoev.ru
1820Sigor@sysoev.ru if (nxt_fast_path(b != NULL)) {
1830Sigor@sysoev.ru
1840Sigor@sysoev.ru nxt_buf_chain_add(&f->output, b);
1850Sigor@sysoev.ru f->last = NULL;
1860Sigor@sysoev.ru
1870Sigor@sysoev.ru f->run->filter_next(f);
1880Sigor@sysoev.ru
1890Sigor@sysoev.ru f->output = NULL;
1900Sigor@sysoev.ru
1910Sigor@sysoev.ru return NXT_OK;
1920Sigor@sysoev.ru }
1930Sigor@sysoev.ru
1940Sigor@sysoev.ru return NXT_ERROR;
1950Sigor@sysoev.ru }
1960Sigor@sysoev.ru
1970Sigor@sysoev.ru
1980Sigor@sysoev.ru nxt_inline void
nxt_buf_filter_next(nxt_buf_filter_t * f)1990Sigor@sysoev.ru nxt_buf_filter_next(nxt_buf_filter_t *f)
2000Sigor@sysoev.ru {
2010Sigor@sysoev.ru if (f->output != NULL) {
2020Sigor@sysoev.ru f->last = NULL;
2030Sigor@sysoev.ru
2040Sigor@sysoev.ru f->run->filter_next(f);
2050Sigor@sysoev.ru f->output = NULL;
2060Sigor@sysoev.ru }
2070Sigor@sysoev.ru }
2080Sigor@sysoev.ru
2090Sigor@sysoev.ru
2100Sigor@sysoev.ru void
nxt_buf_filter_enqueue(nxt_task_t * task,nxt_buf_filter_t * f)2111Sigor@sysoev.ru nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
2120Sigor@sysoev.ru {
2131Sigor@sysoev.ru nxt_debug(task, "buf filter enqueue: %d", f->queued);
2140Sigor@sysoev.ru
2150Sigor@sysoev.ru if (!f->queued && !f->done) {
2160Sigor@sysoev.ru f->queued = 1;
2171Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
2181Sigor@sysoev.ru task, f, NULL);
2190Sigor@sysoev.ru }
2200Sigor@sysoev.ru }
2210Sigor@sysoev.ru
2220Sigor@sysoev.ru
2230Sigor@sysoev.ru static void
nxt_buf_filter_file_read_start(nxt_task_t * task,nxt_buf_filter_t * f)2241Sigor@sysoev.ru nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
2250Sigor@sysoev.ru {
2260Sigor@sysoev.ru nxt_job_file_t *jbf;
2270Sigor@sysoev.ru nxt_buf_filter_file_t *ff;
2280Sigor@sysoev.ru
2290Sigor@sysoev.ru ff = f->run->job_file_create(f);
2300Sigor@sysoev.ru
2310Sigor@sysoev.ru if (nxt_slow_path(ff == NULL)) {
2321Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue,
2331Sigor@sysoev.ru f->run->filter_error,
2341Sigor@sysoev.ru task, f, f->data);
2350Sigor@sysoev.ru return;
2360Sigor@sysoev.ru }
2370Sigor@sysoev.ru
2380Sigor@sysoev.ru f->filter_file = ff;
2390Sigor@sysoev.ru
2400Sigor@sysoev.ru jbf = &ff->job_file;
2410Sigor@sysoev.ru jbf->file = *f->input->file;
2420Sigor@sysoev.ru
2430Sigor@sysoev.ru jbf->ready_handler = nxt_buf_filter_file_job_completion;
2440Sigor@sysoev.ru jbf->error_handler = nxt_buf_filter_file_read_error;
2450Sigor@sysoev.ru
2460Sigor@sysoev.ru nxt_job_set_name(&jbf->job, "buf filter job file");
2470Sigor@sysoev.ru
2480Sigor@sysoev.ru f->reading = 1;
2490Sigor@sysoev.ru
2501Sigor@sysoev.ru nxt_buf_filter_file_read(task, f);
2510Sigor@sysoev.ru }
2520Sigor@sysoev.ru
2530Sigor@sysoev.ru
2540Sigor@sysoev.ru static void
nxt_buf_filter_file_read(nxt_task_t * task,nxt_buf_filter_t * f)2551Sigor@sysoev.ru nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
2560Sigor@sysoev.ru {
2570Sigor@sysoev.ru nxt_int_t ret;
2580Sigor@sysoev.ru nxt_off_t size;
2590Sigor@sysoev.ru nxt_buf_t *b;
2600Sigor@sysoev.ru nxt_buf_filter_file_t *ff;
2610Sigor@sysoev.ru
2620Sigor@sysoev.ru ff = f->filter_file;
2630Sigor@sysoev.ru
2640Sigor@sysoev.ru if (ff->job_file.buffer != NULL) {
2650Sigor@sysoev.ru /* File is now being read. */
2660Sigor@sysoev.ru return;
2670Sigor@sysoev.ru }
2680Sigor@sysoev.ru
2690Sigor@sysoev.ru size = f->input->file_end - f->input->file_pos;
2700Sigor@sysoev.ru
2710Sigor@sysoev.ru if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
2720Sigor@sysoev.ru /*
2730Sigor@sysoev.ru * Small size value is a hint for buffer pool allocation
2740Sigor@sysoev.ru * size, but if size of the size_t type is lesser than size
2750Sigor@sysoev.ru * of the nxt_off_t type, the large size value may be truncated,
2760Sigor@sysoev.ru * so use a default buffer pool allocation size.
2770Sigor@sysoev.ru */
2780Sigor@sysoev.ru size = 0;
2790Sigor@sysoev.ru }
2800Sigor@sysoev.ru
2810Sigor@sysoev.ru if (f->mmap) {
2820Sigor@sysoev.ru ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
2830Sigor@sysoev.ru
2840Sigor@sysoev.ru } else {
2850Sigor@sysoev.ru ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
2860Sigor@sysoev.ru }
2870Sigor@sysoev.ru
2880Sigor@sysoev.ru if (nxt_fast_path(ret == NXT_OK)) {
2890Sigor@sysoev.ru b = ff->buffers.current;
2900Sigor@sysoev.ru
2910Sigor@sysoev.ru b->file_pos = f->input->file_pos;
2920Sigor@sysoev.ru b->file_end = f->input->file_pos;
2930Sigor@sysoev.ru b->file = f->input->file;
2940Sigor@sysoev.ru
2950Sigor@sysoev.ru ff->job_file.buffer = b;
2960Sigor@sysoev.ru ff->job_file.offset = f->input->file_pos;
2970Sigor@sysoev.ru
2980Sigor@sysoev.ru f->run->job_file_retain(f);
2990Sigor@sysoev.ru
3001Sigor@sysoev.ru nxt_job_file_read(task, &ff->job_file.job);
3010Sigor@sysoev.ru return;
3020Sigor@sysoev.ru }
3030Sigor@sysoev.ru
3040Sigor@sysoev.ru if (nxt_fast_path(ret != NXT_ERROR)) {
3050Sigor@sysoev.ru
3060Sigor@sysoev.ru /* ret == NXT_AGAIN: No buffers available. */
3070Sigor@sysoev.ru
3080Sigor@sysoev.ru if (f->buffering) {
3090Sigor@sysoev.ru f->buffering = 0;
3100Sigor@sysoev.ru
3110Sigor@sysoev.ru if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
3120Sigor@sysoev.ru return;
3130Sigor@sysoev.ru }
3140Sigor@sysoev.ru
3150Sigor@sysoev.ru } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
3160Sigor@sysoev.ru return;
3170Sigor@sysoev.ru }
3180Sigor@sysoev.ru }
3190Sigor@sysoev.ru
3201Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
3211Sigor@sysoev.ru task, f, f->data);
3220Sigor@sysoev.ru }
3230Sigor@sysoev.ru
3240Sigor@sysoev.ru
3250Sigor@sysoev.ru typedef struct {
3260Sigor@sysoev.ru nxt_buf_filter_t *filter;
3270Sigor@sysoev.ru nxt_buf_t *buf;
3280Sigor@sysoev.ru } nxt_buf_filter_ctx_t;
3290Sigor@sysoev.ru
3300Sigor@sysoev.ru
3310Sigor@sysoev.ru static void
nxt_buf_filter_file_job_completion(nxt_task_t * task,void * obj,void * data)3321Sigor@sysoev.ru nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
3330Sigor@sysoev.ru {
3340Sigor@sysoev.ru nxt_buf_t *b;
3350Sigor@sysoev.ru nxt_bool_t done;
3360Sigor@sysoev.ru nxt_job_file_t *jbf;
3370Sigor@sysoev.ru nxt_buf_filter_t *f;
3380Sigor@sysoev.ru nxt_buf_filter_ctx_t *ctx;
3390Sigor@sysoev.ru
3400Sigor@sysoev.ru jbf = obj;
3410Sigor@sysoev.ru f = data;
3420Sigor@sysoev.ru b = jbf->buffer;
3430Sigor@sysoev.ru jbf->buffer = NULL;
3440Sigor@sysoev.ru
3451Sigor@sysoev.ru nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
3461Sigor@sysoev.ru jbf->file.name, b->file_pos, b->file_end);
3470Sigor@sysoev.ru
3480Sigor@sysoev.ru f->run->job_file_release(f);
3490Sigor@sysoev.ru
3500Sigor@sysoev.ru ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
3510Sigor@sysoev.ru if (nxt_slow_path(ctx == NULL)) {
3520Sigor@sysoev.ru goto fail;
3530Sigor@sysoev.ru }
3540Sigor@sysoev.ru
3550Sigor@sysoev.ru ctx->filter = f;
3560Sigor@sysoev.ru ctx->buf = f->input;
3570Sigor@sysoev.ru
3580Sigor@sysoev.ru f->input->file_pos = b->file_end;
3590Sigor@sysoev.ru
3600Sigor@sysoev.ru done = (f->input->file_pos == f->input->file_end);
3610Sigor@sysoev.ru
3620Sigor@sysoev.ru if (done) {
3630Sigor@sysoev.ru f->input = f->input->next;
3640Sigor@sysoev.ru f->reading = 0;
3650Sigor@sysoev.ru }
3660Sigor@sysoev.ru
3670Sigor@sysoev.ru b->data = f->data;
3680Sigor@sysoev.ru b->completion_handler = nxt_buf_filter_buf_completion;
3690Sigor@sysoev.ru b->parent = (nxt_buf_t *) ctx;
3700Sigor@sysoev.ru b->next = NULL;
3710Sigor@sysoev.ru
3720Sigor@sysoev.ru nxt_buf_chain_add(&f->current, b);
3730Sigor@sysoev.ru
3741Sigor@sysoev.ru nxt_buf_filter(task, f, NULL);
3750Sigor@sysoev.ru
3760Sigor@sysoev.ru if (b->mem.pos == b->mem.free) {
3770Sigor@sysoev.ru /*
3780Sigor@sysoev.ru * The buffer has been completely processed by nxt_buf_filter(),
3790Sigor@sysoev.ru * its completion handler has been placed in workqueue and
3800Sigor@sysoev.ru * nxt_buf_filter_buf_completion() should be eventually called.
3810Sigor@sysoev.ru */
3820Sigor@sysoev.ru return;
3830Sigor@sysoev.ru }
3840Sigor@sysoev.ru
3850Sigor@sysoev.ru if (!done) {
3860Sigor@sysoev.ru /* Try to allocate another buffer and read the next file part. */
3871Sigor@sysoev.ru nxt_buf_filter_file_read(task, f);
3880Sigor@sysoev.ru }
3890Sigor@sysoev.ru
3900Sigor@sysoev.ru return;
3910Sigor@sysoev.ru
3920Sigor@sysoev.ru fail:
3930Sigor@sysoev.ru
3941Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
3951Sigor@sysoev.ru task, f, f->data);
3960Sigor@sysoev.ru }
3970Sigor@sysoev.ru
3980Sigor@sysoev.ru
3990Sigor@sysoev.ru static void
nxt_buf_filter_buf_completion(nxt_task_t * task,void * obj,void * data)4001Sigor@sysoev.ru nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
4010Sigor@sysoev.ru {
4020Sigor@sysoev.ru nxt_buf_t *fb, *b;
4030Sigor@sysoev.ru nxt_buf_filter_t *f;
4040Sigor@sysoev.ru nxt_buf_filter_ctx_t *ctx;
4050Sigor@sysoev.ru
4060Sigor@sysoev.ru b = obj;
4070Sigor@sysoev.ru ctx = data;
4080Sigor@sysoev.ru f = ctx->filter;
4090Sigor@sysoev.ru
4101Sigor@sysoev.ru nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
4111Sigor@sysoev.ru b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
4120Sigor@sysoev.ru
4130Sigor@sysoev.ru /* nxt_http_send_filter() might clear a buffer's file status. */
4140Sigor@sysoev.ru b->is_file = 1;
4150Sigor@sysoev.ru
4160Sigor@sysoev.ru fb = ctx->buf;
4170Sigor@sysoev.ru
418*65Sigor@sysoev.ru nxt_mp_free(f->mem_pool, ctx);
4190Sigor@sysoev.ru nxt_buf_pool_free(&f->filter_file->buffers, b);
4200Sigor@sysoev.ru
4210Sigor@sysoev.ru if (fb->file_pos < fb->file_end) {
4221Sigor@sysoev.ru nxt_buf_filter_file_read(task, f);
4230Sigor@sysoev.ru return;
4240Sigor@sysoev.ru }
4250Sigor@sysoev.ru
4260Sigor@sysoev.ru if (b->file_end == fb->file_end) {
4270Sigor@sysoev.ru nxt_buf_pool_destroy(&f->filter_file->buffers);
4280Sigor@sysoev.ru
4290Sigor@sysoev.ru nxt_job_destroy(&f->filter_file->job_file.job);
4300Sigor@sysoev.ru
4311Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue,
4321Sigor@sysoev.ru fb->completion_handler,
4331Sigor@sysoev.ru task, fb, fb->parent);
4340Sigor@sysoev.ru }
4350Sigor@sysoev.ru
4361Sigor@sysoev.ru nxt_buf_filter(task, f, NULL);
4370Sigor@sysoev.ru }
4380Sigor@sysoev.ru
4390Sigor@sysoev.ru
4400Sigor@sysoev.ru static void
nxt_buf_filter_file_read_error(nxt_task_t * task,void * obj,void * data)4411Sigor@sysoev.ru nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
4420Sigor@sysoev.ru {
4430Sigor@sysoev.ru nxt_buf_filter_t *f;
4440Sigor@sysoev.ru
4450Sigor@sysoev.ru f = data;
4460Sigor@sysoev.ru
4471Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
4481Sigor@sysoev.ru task, f, f->data);
4490Sigor@sysoev.ru }
450