1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f); 11 nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f); 12 static void nxt_buf_filter_file_read_start(nxt_thread_t *thr, 13 nxt_buf_filter_t *f); 14 static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f); 15 static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr, 16 void *obj, void *data); 17 static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, 18 void *data); 19 static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, 20 void *data); 21 22 23 void 24 nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b) 25 { 26 nxt_buf_chain_add(&f->input, b); 27 28 nxt_buf_filter(thr, f, NULL); 29 } 30 31 32 void 33 nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) 34 { 35 nxt_int_t ret; 36 nxt_buf_t *b; 37 nxt_buf_filter_t *f; 38 39 f = obj; 40 41 nxt_log_debug(thr->log, "buf filter"); 42 43 if (f->done) { 44 return; 45 } 46 47 f->queued = 0; 48 49 for ( ;; ) { 50 /* 51 * f->input is a chain of original incoming buffers: memory, 52 * mapped, file, and sync buffers; 53 * f->current is a currently processed memory buffer or a chain 54 * of memory/file or mapped/file buffers which are read of 55 * or populated from file; 56 * f->output is a chain of output buffers; 57 * f->last is the last output buffer in the chain. 58 */ 59 60 b = f->current; 61 62 nxt_log_debug(thr->log, "buf filter current: %p", b); 63 64 if (b == NULL) { 65 66 if (f->reading) { 67 return; 68 } 69 70 b = f->input; 71 72 nxt_log_debug(thr->log, "buf filter input: %p", b); 73 74 if (b == NULL) { 75 /* 76 * The end of the input chain, pass 77 * the output chain to the next filter. 78 */ 79 nxt_buf_filter_next(f); 80 81 return; 82 } 83 84 if (nxt_buf_is_mem(b)) { 85 86 f->current = b; 87 f->input = b->next; 88 b->next = NULL; 89 90 } else if (nxt_buf_is_file(b)) { 91 92 if (f->run->filter_ready(f) != NXT_OK) { 93 nxt_buf_filter_next(f); 94 } 95 96 nxt_buf_filter_file_read_start(thr, f); 97 return; 98 } 99 } 100 101 if (nxt_buf_is_sync(b)) { 102 103 ret = NXT_OK; 104 f->current = b; 105 f->input = b->next; 106 b->next = NULL; 107 108 if (nxt_buf_is_nobuf(b)) { 109 ret = f->run->filter_sync_nobuf(f); 110 111 } else if (nxt_buf_is_flush(b)) { 112 ret = f->run->filter_sync_flush(f); 113 114 } else if (nxt_buf_is_last(b)) { 115 ret = f->run->filter_sync_last(f); 116 117 f->done = (ret == NXT_OK); 118 } 119 120 if (nxt_fast_path(ret == NXT_OK)) { 121 continue; 122 } 123 124 if (nxt_slow_path(ret == NXT_ERROR)) { 125 goto fail; 126 } 127 128 /* ret == NXT_AGAIN: No filter internal buffers available. */ 129 goto nobuf; 130 } 131 132 ret = f->run->filter_process(f); 133 134 if (nxt_fast_path(ret == NXT_OK)) { 135 b = f->current; 136 /* 137 * A filter may just move f->current to f->output 138 * and then set f->current to NULL. 139 */ 140 if (b != NULL && b->mem.pos == b->mem.free) { 141 f->current = b->next; 142 nxt_thread_work_queue_add(thr, f->work_queue, 143 b->completion_handler, 144 b, b->parent, thr->log); 145 } 146 147 continue; 148 } 149 150 if (nxt_slow_path(ret == NXT_ERROR)) { 151 goto fail; 152 } 153 154 /* ret == NXT_AGAIN: No filter internal buffers available. */ 155 goto nobuf; 156 } 157 158 nobuf: 159 160 /* ret == NXT_AGAIN: No filter internal buffers available. */ 161 162 if (nxt_buf_filter_nobuf(f) == NXT_OK) { 163 return; 164 } 165 166 fail: 167 168 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, 169 f, f->data, thr->log); 170 } 171 172 173 static nxt_int_t 174 nxt_buf_filter_nobuf(nxt_buf_filter_t *f) 175 { 176 nxt_buf_t *b; 177 178 nxt_thread_log_debug("buf filter nobuf"); 179 180 b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF); 181 182 if (nxt_fast_path(b != NULL)) { 183 184 nxt_buf_chain_add(&f->output, b); 185 f->last = NULL; 186 187 f->run->filter_next(f); 188 189 f->output = NULL; 190 191 return NXT_OK; 192 } 193 194 return NXT_ERROR; 195 } 196 197 198 nxt_inline void 199 nxt_buf_filter_next(nxt_buf_filter_t *f) 200 { 201 if (f->output != NULL) { 202 f->last = NULL; 203 204 f->run->filter_next(f); 205 f->output = NULL; 206 } 207 } 208 209 210 void 211 nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f) 212 { 213 nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued); 214 215 if (!f->queued && !f->done) { 216 f->queued = 1; 217 nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter, 218 f, NULL, thr->log); 219 } 220 } 221 222 223 static void 224 nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f) 225 { 226 nxt_job_file_t *jbf; 227 nxt_buf_filter_file_t *ff; 228 229 ff = f->run->job_file_create(f); 230 231 if (nxt_slow_path(ff == NULL)) { 232 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, 233 f, f->data, thr->log); 234 return; 235 } 236 237 f->filter_file = ff; 238 239 jbf = &ff->job_file; 240 jbf->file = *f->input->file; 241 242 jbf->ready_handler = nxt_buf_filter_file_job_completion; 243 jbf->error_handler = nxt_buf_filter_file_read_error; 244 245 nxt_job_set_name(&jbf->job, "buf filter job file"); 246 247 f->reading = 1; 248 249 nxt_buf_filter_file_read(thr, f); 250 } 251 252 253 static void 254 nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f) 255 { 256 nxt_int_t ret; 257 nxt_off_t size; 258 nxt_buf_t *b; 259 nxt_buf_filter_file_t *ff; 260 261 ff = f->filter_file; 262 263 if (ff->job_file.buffer != NULL) { 264 /* File is now being read. */ 265 return; 266 } 267 268 size = f->input->file_end - f->input->file_pos; 269 270 if (size > (nxt_off_t) NXT_SIZE_T_MAX) { 271 /* 272 * Small size value is a hint for buffer pool allocation 273 * size, but if size of the size_t type is lesser than size 274 * of the nxt_off_t type, the large size value may be truncated, 275 * so use a default buffer pool allocation size. 276 */ 277 size = 0; 278 } 279 280 if (f->mmap) { 281 ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size); 282 283 } else { 284 ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size); 285 } 286 287 if (nxt_fast_path(ret == NXT_OK)) { 288 b = ff->buffers.current; 289 290 b->file_pos = f->input->file_pos; 291 b->file_end = f->input->file_pos; 292 b->file = f->input->file; 293 294 ff->job_file.buffer = b; 295 ff->job_file.offset = f->input->file_pos; 296 297 f->run->job_file_retain(f); 298 299 nxt_job_file_read(thr, &ff->job_file.job); 300 return; 301 } 302 303 if (nxt_fast_path(ret != NXT_ERROR)) { 304 305 /* ret == NXT_AGAIN: No buffers available. */ 306 307 if (f->buffering) { 308 f->buffering = 0; 309 310 if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) { 311 return; 312 } 313 314 } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) { 315 return; 316 } 317 } 318 319 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, 320 f, f->data, thr->log); 321 } 322 323 324 typedef struct { 325 nxt_buf_filter_t *filter; 326 nxt_buf_t *buf; 327 } nxt_buf_filter_ctx_t; 328 329 330 static void 331 nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data) 332 { 333 nxt_buf_t *b; 334 nxt_bool_t done; 335 nxt_job_file_t *jbf; 336 nxt_buf_filter_t *f; 337 nxt_buf_filter_ctx_t *ctx; 338 339 jbf = obj; 340 f = data; 341 b = jbf->buffer; 342 jbf->buffer = NULL; 343 344 nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O", 345 jbf->file.name, b->file_pos, b->file_end); 346 347 f->run->job_file_release(f); 348 349 ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t)); 350 if (nxt_slow_path(ctx == NULL)) { 351 goto fail; 352 } 353 354 ctx->filter = f; 355 ctx->buf = f->input; 356 357 f->input->file_pos = b->file_end; 358 359 done = (f->input->file_pos == f->input->file_end); 360 361 if (done) { 362 f->input = f->input->next; 363 f->reading = 0; 364 } 365 366 b->data = f->data; 367 b->completion_handler = nxt_buf_filter_buf_completion; 368 b->parent = (nxt_buf_t *) ctx; 369 b->next = NULL; 370 371 nxt_buf_chain_add(&f->current, b); 372 373 nxt_buf_filter(thr, f, NULL); 374 375 if (b->mem.pos == b->mem.free) { 376 /* 377 * The buffer has been completely processed by nxt_buf_filter(), 378 * its completion handler has been placed in workqueue and 379 * nxt_buf_filter_buf_completion() should be eventually called. 380 */ 381 return; 382 } 383 384 if (!done) { 385 /* Try to allocate another buffer and read the next file part. */ 386 nxt_buf_filter_file_read(thr, f); 387 } 388 389 return; 390 391 fail: 392 393 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, 394 f, f->data, thr->log); 395 } 396 397 398 static void 399 nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data) 400 { 401 nxt_buf_t *fb, *b; 402 nxt_buf_filter_t *f; 403 nxt_buf_filter_ctx_t *ctx; 404 405 b = obj; 406 ctx = data; 407 f = ctx->filter; 408 409 nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O", 410 b, f->filter_file->job_file.file.name, 411 b->file_pos, b->file_end); 412 413 /* nxt_http_send_filter() might clear a buffer's file status. */ 414 b->is_file = 1; 415 416 fb = ctx->buf; 417 418 nxt_mem_cache_free0(f->mem_pool, ctx, sizeof(nxt_buf_filter_ctx_t)); 419 nxt_buf_pool_free(&f->filter_file->buffers, b); 420 421 if (fb->file_pos < fb->file_end) { 422 nxt_buf_filter_file_read(thr, f); 423 return; 424 } 425 426 if (b->file_end == fb->file_end) { 427 nxt_buf_pool_destroy(&f->filter_file->buffers); 428 429 nxt_job_destroy(&f->filter_file->job_file.job); 430 431 nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler, 432 fb, fb->parent, thr->log); 433 } 434 435 nxt_buf_filter(thr, f, NULL); 436 } 437 438 439 static void 440 nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data) 441 { 442 nxt_buf_filter_t *f; 443 444 f = data; 445 446 nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, 447 f, f->data, thr->log); 448 } 449