1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
11 nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
12 static void nxt_buf_filter_file_read_start(nxt_task_t *task,
13 nxt_buf_filter_t *f);
14 static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
15 static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
16 void *data);
17 static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
18 void *data);
19 static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
20 void *data);
21
22
23 void
nxt_buf_filter_add(nxt_task_t * task,nxt_buf_filter_t * f,nxt_buf_t * b)24 nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
25 {
26 nxt_buf_chain_add(&f->input, b);
27
28 nxt_buf_filter(task, f, NULL);
29 }
30
31
32 void
nxt_buf_filter(nxt_task_t * task,void * obj,void * data)33 nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
34 {
35 nxt_int_t ret;
36 nxt_buf_t *b;
37 nxt_buf_filter_t *f;
38
39 f = obj;
40
41 nxt_debug(task, "buf filter");
42
43 if (f->done) {
44 return;
45 }
46
47 f->queued = 0;
48
49 for ( ;; ) {
50 /*
51 * f->input is a chain of original incoming buffers: memory,
52 * mapped, file, and sync buffers;
53 * f->current is a currently processed memory buffer or a chain
54 * of memory/file or mapped/file buffers which are read of
55 * or populated from file;
56 * f->output is a chain of output buffers;
57 * f->last is the last output buffer in the chain.
58 */
59
60 b = f->current;
61
62 nxt_debug(task, "buf filter current: %p", b);
63
64 if (b == NULL) {
65
66 if (f->reading) {
67 return;
68 }
69
70 b = f->input;
71
72 nxt_debug(task, "buf filter input: %p", b);
73
74 if (b == NULL) {
75 /*
76 * The end of the input chain, pass
77 * the output chain to the next filter.
78 */
79 nxt_buf_filter_next(f);
80
81 return;
82 }
83
84 if (nxt_buf_is_mem(b)) {
85
86 f->current = b;
87 f->input = b->next;
88 b->next = NULL;
89
90 } else if (nxt_buf_is_file(b)) {
91
92 if (f->run->filter_ready(f) != NXT_OK) {
93 nxt_buf_filter_next(f);
94 }
95
96 nxt_buf_filter_file_read_start(task, f);
97 return;
98 }
99 }
100
101 if (nxt_buf_is_sync(b)) {
102
103 ret = NXT_OK;
104 f->current = b;
105 f->input = b->next;
106 b->next = NULL;
107
108 if (nxt_buf_is_nobuf(b)) {
109 ret = f->run->filter_sync_nobuf(f);
110
111 } else if (nxt_buf_is_flush(b)) {
112 ret = f->run->filter_sync_flush(f);
113
114 } else if (nxt_buf_is_last(b)) {
115 ret = f->run->filter_sync_last(f);
116
117 f->done = (ret == NXT_OK);
118 }
119
120 if (nxt_fast_path(ret == NXT_OK)) {
121 continue;
122 }
123
124 if (nxt_slow_path(ret == NXT_ERROR)) {
125 goto fail;
126 }
127
128 /* ret == NXT_AGAIN: No filter internal buffers available. */
129 goto nobuf;
130 }
131
132 ret = f->run->filter_process(f);
133
134 if (nxt_fast_path(ret == NXT_OK)) {
135 b = f->current;
136 /*
137 * A filter may just move f->current to f->output
138 * and then set f->current to NULL.
139 */
140 if (b != NULL && b->mem.pos == b->mem.free) {
141 f->current = b->next;
142 nxt_thread_work_queue_add(task->thread, f->work_queue,
143 b->completion_handler,
144 task, b, b->parent);
145 }
146
147 continue;
148 }
149
150 if (nxt_slow_path(ret == NXT_ERROR)) {
151 goto fail;
152 }
153
154 /* ret == NXT_AGAIN: No filter internal buffers available. */
155 goto nobuf;
156 }
157
158 nobuf:
159
160 /* ret == NXT_AGAIN: No filter internal buffers available. */
161
162 if (nxt_buf_filter_nobuf(f) == NXT_OK) {
163 return;
164 }
165
166 fail:
167
168 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
169 task, f, f->data);
170 }
171
172
173 static nxt_int_t
nxt_buf_filter_nobuf(nxt_buf_filter_t * f)174 nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
175 {
176 nxt_buf_t *b;
177
178 nxt_thread_log_debug("buf filter nobuf");
179
180 b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
181
182 if (nxt_fast_path(b != NULL)) {
183
184 nxt_buf_chain_add(&f->output, b);
185 f->last = NULL;
186
187 f->run->filter_next(f);
188
189 f->output = NULL;
190
191 return NXT_OK;
192 }
193
194 return NXT_ERROR;
195 }
196
197
198 nxt_inline void
nxt_buf_filter_next(nxt_buf_filter_t * f)199 nxt_buf_filter_next(nxt_buf_filter_t *f)
200 {
201 if (f->output != NULL) {
202 f->last = NULL;
203
204 f->run->filter_next(f);
205 f->output = NULL;
206 }
207 }
208
209
210 void
nxt_buf_filter_enqueue(nxt_task_t * task,nxt_buf_filter_t * f)211 nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
212 {
213 nxt_debug(task, "buf filter enqueue: %d", f->queued);
214
215 if (!f->queued && !f->done) {
216 f->queued = 1;
217 nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
218 task, f, NULL);
219 }
220 }
221
222
223 static void
nxt_buf_filter_file_read_start(nxt_task_t * task,nxt_buf_filter_t * f)224 nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
225 {
226 nxt_job_file_t *jbf;
227 nxt_buf_filter_file_t *ff;
228
229 ff = f->run->job_file_create(f);
230
231 if (nxt_slow_path(ff == NULL)) {
232 nxt_thread_work_queue_add(task->thread, f->work_queue,
233 f->run->filter_error,
234 task, f, f->data);
235 return;
236 }
237
238 f->filter_file = ff;
239
240 jbf = &ff->job_file;
241 jbf->file = *f->input->file;
242
243 jbf->ready_handler = nxt_buf_filter_file_job_completion;
244 jbf->error_handler = nxt_buf_filter_file_read_error;
245
246 nxt_job_set_name(&jbf->job, "buf filter job file");
247
248 f->reading = 1;
249
250 nxt_buf_filter_file_read(task, f);
251 }
252
253
254 static void
nxt_buf_filter_file_read(nxt_task_t * task,nxt_buf_filter_t * f)255 nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
256 {
257 nxt_int_t ret;
258 nxt_off_t size;
259 nxt_buf_t *b;
260 nxt_buf_filter_file_t *ff;
261
262 ff = f->filter_file;
263
264 if (ff->job_file.buffer != NULL) {
265 /* File is now being read. */
266 return;
267 }
268
269 size = f->input->file_end - f->input->file_pos;
270
271 if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
272 /*
273 * Small size value is a hint for buffer pool allocation
274 * size, but if size of the size_t type is lesser than size
275 * of the nxt_off_t type, the large size value may be truncated,
276 * so use a default buffer pool allocation size.
277 */
278 size = 0;
279 }
280
281 if (f->mmap) {
282 ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
283
284 } else {
285 ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
286 }
287
288 if (nxt_fast_path(ret == NXT_OK)) {
289 b = ff->buffers.current;
290
291 b->file_pos = f->input->file_pos;
292 b->file_end = f->input->file_pos;
293 b->file = f->input->file;
294
295 ff->job_file.buffer = b;
296 ff->job_file.offset = f->input->file_pos;
297
298 f->run->job_file_retain(f);
299
300 nxt_job_file_read(task, &ff->job_file.job);
301 return;
302 }
303
304 if (nxt_fast_path(ret != NXT_ERROR)) {
305
306 /* ret == NXT_AGAIN: No buffers available. */
307
308 if (f->buffering) {
309 f->buffering = 0;
310
311 if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
312 return;
313 }
314
315 } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
316 return;
317 }
318 }
319
320 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
321 task, f, f->data);
322 }
323
324
325 typedef struct {
326 nxt_buf_filter_t *filter;
327 nxt_buf_t *buf;
328 } nxt_buf_filter_ctx_t;
329
330
331 static void
nxt_buf_filter_file_job_completion(nxt_task_t * task,void * obj,void * data)332 nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
333 {
334 nxt_buf_t *b;
335 nxt_bool_t done;
336 nxt_job_file_t *jbf;
337 nxt_buf_filter_t *f;
338 nxt_buf_filter_ctx_t *ctx;
339
340 jbf = obj;
341 f = data;
342 b = jbf->buffer;
343 jbf->buffer = NULL;
344
345 nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
346 jbf->file.name, b->file_pos, b->file_end);
347
348 f->run->job_file_release(f);
349
350 ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
351 if (nxt_slow_path(ctx == NULL)) {
352 goto fail;
353 }
354
355 ctx->filter = f;
356 ctx->buf = f->input;
357
358 f->input->file_pos = b->file_end;
359
360 done = (f->input->file_pos == f->input->file_end);
361
362 if (done) {
363 f->input = f->input->next;
364 f->reading = 0;
365 }
366
367 b->data = f->data;
368 b->completion_handler = nxt_buf_filter_buf_completion;
369 b->parent = (nxt_buf_t *) ctx;
370 b->next = NULL;
371
372 nxt_buf_chain_add(&f->current, b);
373
374 nxt_buf_filter(task, f, NULL);
375
376 if (b->mem.pos == b->mem.free) {
377 /*
378 * The buffer has been completely processed by nxt_buf_filter(),
379 * its completion handler has been placed in workqueue and
380 * nxt_buf_filter_buf_completion() should be eventually called.
381 */
382 return;
383 }
384
385 if (!done) {
386 /* Try to allocate another buffer and read the next file part. */
387 nxt_buf_filter_file_read(task, f);
388 }
389
390 return;
391
392 fail:
393
394 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
395 task, f, f->data);
396 }
397
398
399 static void
nxt_buf_filter_buf_completion(nxt_task_t * task,void * obj,void * data)400 nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
401 {
402 nxt_buf_t *fb, *b;
403 nxt_buf_filter_t *f;
404 nxt_buf_filter_ctx_t *ctx;
405
406 b = obj;
407 ctx = data;
408 f = ctx->filter;
409
410 nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
411 b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
412
413 /* nxt_http_send_filter() might clear a buffer's file status. */
414 b->is_file = 1;
415
416 fb = ctx->buf;
417
418 nxt_mp_free(f->mem_pool, ctx);
419 nxt_buf_pool_free(&f->filter_file->buffers, b);
420
421 if (fb->file_pos < fb->file_end) {
422 nxt_buf_filter_file_read(task, f);
423 return;
424 }
425
426 if (b->file_end == fb->file_end) {
427 nxt_buf_pool_destroy(&f->filter_file->buffers);
428
429 nxt_job_destroy(&f->filter_file->job_file.job);
430
431 nxt_thread_work_queue_add(task->thread, f->work_queue,
432 fb->completion_handler,
433 task, fb, fb->parent);
434 }
435
436 nxt_buf_filter(task, f, NULL);
437 }
438
439
440 static void
nxt_buf_filter_file_read_error(nxt_task_t * task,void * obj,void * data)441 nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
442 {
443 nxt_buf_filter_t *f;
444
445 f = data;
446
447 nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
448 task, f, f->data);
449 }
450