Back to home page

Nginx displayed by LXR

Source navigation ]
Diff markup ]
Identifier search ]
general search ]
 
 
Version: nginx-1.13.12 ]​[ nginx-1.12.2 ]​

0001 
0002 /*
0003  * Copyright (C) Igor Sysoev
0004  * Copyright (C) Nginx, Inc.
0005  */
0006 
0007 
0008 #include <ngx_config.h>
0009 #include <ngx_core.h>
0010 #include <ngx_event.h>
0011 #include <ngx_event_pipe.h>
0012 
0013 
0014 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
0015 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
0016 
0017 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
0018 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
0019 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
0020 
0021 
0022 ngx_int_t
0023 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
0024 {
0025     ngx_int_t     rc;
0026     ngx_uint_t    flags;
0027     ngx_event_t  *rev, *wev;
0028 
0029     for ( ;; ) {
0030         if (do_write) {
0031             p->log->action = "sending to client";
0032 
0033             rc = ngx_event_pipe_write_to_downstream(p);
0034 
0035             if (rc == NGX_ABORT) {
0036                 return NGX_ABORT;
0037             }
0038 
0039             if (rc == NGX_BUSY) {
0040                 return NGX_OK;
0041             }
0042         }
0043 
0044         p->read = 0;
0045         p->upstream_blocked = 0;
0046 
0047         p->log->action = "reading upstream";
0048 
0049         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
0050             return NGX_ABORT;
0051         }
0052 
0053         if (!p->read && !p->upstream_blocked) {
0054             break;
0055         }
0056 
0057         do_write = 1;
0058     }
0059 
0060     if (p->upstream->fd != (ngx_socket_t) -1) {
0061         rev = p->upstream->read;
0062 
0063         flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
0064 
0065         if (ngx_handle_read_event(rev, flags) != NGX_OK) {
0066             return NGX_ABORT;
0067         }
0068 
0069         if (!rev->delayed) {
0070             if (rev->active && !rev->ready) {
0071                 ngx_add_timer(rev, p->read_timeout);
0072 
0073             } else if (rev->timer_set) {
0074                 ngx_del_timer(rev);
0075             }
0076         }
0077     }
0078 
0079     if (p->downstream->fd != (ngx_socket_t) -1
0080         && p->downstream->data == p->output_ctx)
0081     {
0082         wev = p->downstream->write;
0083         if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
0084             return NGX_ABORT;
0085         }
0086 
0087         if (!wev->delayed) {
0088             if (wev->active && !wev->ready) {
0089                 ngx_add_timer(wev, p->send_timeout);
0090 
0091             } else if (wev->timer_set) {
0092                 ngx_del_timer(wev);
0093             }
0094         }
0095     }
0096 
0097     return NGX_OK;
0098 }
0099 
0100 
0101 static ngx_int_t
0102 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
0103 {
0104     off_t         limit;
0105     ssize_t       n, size;
0106     ngx_int_t     rc;
0107     ngx_buf_t    *b;
0108     ngx_msec_t    delay;
0109     ngx_chain_t  *chain, *cl, *ln;
0110 
0111     if (p->upstream_eof || p->upstream_error || p->upstream_done) {
0112         return NGX_OK;
0113     }
0114 
0115 #if (NGX_THREADS)
0116 
0117     if (p->aio) {
0118         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0119                        "pipe read upstream: aio");
0120         return NGX_AGAIN;
0121     }
0122 
0123     if (p->writing) {
0124         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0125                        "pipe read upstream: writing");
0126 
0127         rc = ngx_event_pipe_write_chain_to_temp_file(p);
0128 
0129         if (rc != NGX_OK) {
0130             return rc;
0131         }
0132     }
0133 
0134 #endif
0135 
0136     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0137                    "pipe read upstream: %d", p->upstream->read->ready);
0138 
0139     for ( ;; ) {
0140 
0141         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
0142             break;
0143         }
0144 
0145         if (p->preread_bufs == NULL && !p->upstream->read->ready) {
0146             break;
0147         }
0148 
0149         if (p->preread_bufs) {
0150 
0151             /* use the pre-read bufs if they exist */
0152 
0153             chain = p->preread_bufs;
0154             p->preread_bufs = NULL;
0155             n = p->preread_size;
0156 
0157             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0158                            "pipe preread: %z", n);
0159 
0160             if (n) {
0161                 p->read = 1;
0162             }
0163 
0164         } else {
0165 
0166 #if (NGX_HAVE_KQUEUE)
0167 
0168             /*
0169              * kqueue notifies about the end of file or a pending error.
0170              * This test allows not to allocate a buf on these conditions
0171              * and not to call c->recv_chain().
0172              */
0173 
0174             if (p->upstream->read->available == 0
0175                 && p->upstream->read->pending_eof)
0176             {
0177                 p->upstream->read->ready = 0;
0178                 p->upstream->read->eof = 1;
0179                 p->upstream_eof = 1;
0180                 p->read = 1;
0181 
0182                 if (p->upstream->read->kq_errno) {
0183                     p->upstream->read->error = 1;
0184                     p->upstream_error = 1;
0185                     p->upstream_eof = 0;
0186 
0187                     ngx_log_error(NGX_LOG_ERR, p->log,
0188                                   p->upstream->read->kq_errno,
0189                                   "kevent() reported that upstream "
0190                                   "closed connection");
0191                 }
0192 
0193                 break;
0194             }
0195 #endif
0196 
0197             if (p->limit_rate) {
0198                 if (p->upstream->read->delayed) {
0199                     break;
0200                 }
0201 
0202                 limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
0203                         - p->read_length;
0204 
0205                 if (limit <= 0) {
0206                     p->upstream->read->delayed = 1;
0207                     delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
0208                     ngx_add_timer(p->upstream->read, delay);
0209                     break;
0210                 }
0211 
0212             } else {
0213                 limit = 0;
0214             }
0215 
0216             if (p->free_raw_bufs) {
0217 
0218                 /* use the free bufs if they exist */
0219 
0220                 chain = p->free_raw_bufs;
0221                 if (p->single_buf) {
0222                     p->free_raw_bufs = p->free_raw_bufs->next;
0223                     chain->next = NULL;
0224                 } else {
0225                     p->free_raw_bufs = NULL;
0226                 }
0227 
0228             } else if (p->allocated < p->bufs.num) {
0229 
0230                 /* allocate a new buf if it's still allowed */
0231 
0232                 b = ngx_create_temp_buf(p->pool, p->bufs.size);
0233                 if (b == NULL) {
0234                     return NGX_ABORT;
0235                 }
0236 
0237                 p->allocated++;
0238 
0239                 chain = ngx_alloc_chain_link(p->pool);
0240                 if (chain == NULL) {
0241                     return NGX_ABORT;
0242                 }
0243 
0244                 chain->buf = b;
0245                 chain->next = NULL;
0246 
0247             } else if (!p->cacheable
0248                        && p->downstream->data == p->output_ctx
0249                        && p->downstream->write->ready
0250                        && !p->downstream->write->delayed)
0251             {
0252                 /*
0253                  * if the bufs are not needed to be saved in a cache and
0254                  * a downstream is ready then write the bufs to a downstream
0255                  */
0256 
0257                 p->upstream_blocked = 1;
0258 
0259                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0260                                "pipe downstream ready");
0261 
0262                 break;
0263 
0264             } else if (p->cacheable
0265                        || p->temp_file->offset < p->max_temp_file_size)
0266             {
0267 
0268                 /*
0269                  * if it is allowed, then save some bufs from p->in
0270                  * to a temporary file, and add them to a p->out chain
0271                  */
0272 
0273                 rc = ngx_event_pipe_write_chain_to_temp_file(p);
0274 
0275                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0276                                "pipe temp offset: %O", p->temp_file->offset);
0277 
0278                 if (rc == NGX_BUSY) {
0279                     break;
0280                 }
0281 
0282                 if (rc != NGX_OK) {
0283                     return rc;
0284                 }
0285 
0286                 chain = p->free_raw_bufs;
0287                 if (p->single_buf) {
0288                     p->free_raw_bufs = p->free_raw_bufs->next;
0289                     chain->next = NULL;
0290                 } else {
0291                     p->free_raw_bufs = NULL;
0292                 }
0293 
0294             } else {
0295 
0296                 /* there are no bufs to read in */
0297 
0298                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0299                                "no pipe bufs to read in");
0300 
0301                 break;
0302             }
0303 
0304             n = p->upstream->recv_chain(p->upstream, chain, limit);
0305 
0306             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0307                            "pipe recv chain: %z", n);
0308 
0309             if (p->free_raw_bufs) {
0310                 chain->next = p->free_raw_bufs;
0311             }
0312             p->free_raw_bufs = chain;
0313 
0314             if (n == NGX_ERROR) {
0315                 p->upstream_error = 1;
0316                 break;
0317             }
0318 
0319             if (n == NGX_AGAIN) {
0320                 if (p->single_buf) {
0321                     ngx_event_pipe_remove_shadow_links(chain->buf);
0322                 }
0323 
0324                 break;
0325             }
0326 
0327             p->read = 1;
0328 
0329             if (n == 0) {
0330                 p->upstream_eof = 1;
0331                 break;
0332             }
0333         }
0334 
0335         delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
0336 
0337         p->read_length += n;
0338         cl = chain;
0339         p->free_raw_bufs = NULL;
0340 
0341         while (cl && n > 0) {
0342 
0343             ngx_event_pipe_remove_shadow_links(cl->buf);
0344 
0345             size = cl->buf->end - cl->buf->last;
0346 
0347             if (n >= size) {
0348                 cl->buf->last = cl->buf->end;
0349 
0350                 /* STUB */ cl->buf->num = p->num++;
0351 
0352                 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
0353                     return NGX_ABORT;
0354                 }
0355 
0356                 n -= size;
0357                 ln = cl;
0358                 cl = cl->next;
0359                 ngx_free_chain(p->pool, ln);
0360 
0361             } else {
0362                 cl->buf->last += n;
0363                 n = 0;
0364             }
0365         }
0366 
0367         if (cl) {
0368             for (ln = cl; ln->next; ln = ln->next) { /* void */ }
0369 
0370             ln->next = p->free_raw_bufs;
0371             p->free_raw_bufs = cl;
0372         }
0373 
0374         if (delay > 0) {
0375             p->upstream->read->delayed = 1;
0376             ngx_add_timer(p->upstream->read, delay);
0377             break;
0378         }
0379     }
0380 
0381 #if (NGX_DEBUG)
0382 
0383     for (cl = p->busy; cl; cl = cl->next) {
0384         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
0385                        "pipe buf busy s:%d t:%d f:%d "
0386                        "%p, pos %p, size: %z "
0387                        "file: %O, size: %O",
0388                        (cl->buf->shadow ? 1 : 0),
0389                        cl->buf->temporary, cl->buf->in_file,
0390                        cl->buf->start, cl->buf->pos,
0391                        cl->buf->last - cl->buf->pos,
0392                        cl->buf->file_pos,
0393                        cl->buf->file_last - cl->buf->file_pos);
0394     }
0395 
0396     for (cl = p->out; cl; cl = cl->next) {
0397         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
0398                        "pipe buf out  s:%d t:%d f:%d "
0399                        "%p, pos %p, size: %z "
0400                        "file: %O, size: %O",
0401                        (cl->buf->shadow ? 1 : 0),
0402                        cl->buf->temporary, cl->buf->in_file,
0403                        cl->buf->start, cl->buf->pos,
0404                        cl->buf->last - cl->buf->pos,
0405                        cl->buf->file_pos,
0406                        cl->buf->file_last - cl->buf->file_pos);
0407     }
0408 
0409     for (cl = p->in; cl; cl = cl->next) {
0410         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
0411                        "pipe buf in   s:%d t:%d f:%d "
0412                        "%p, pos %p, size: %z "
0413                        "file: %O, size: %O",
0414                        (cl->buf->shadow ? 1 : 0),
0415                        cl->buf->temporary, cl->buf->in_file,
0416                        cl->buf->start, cl->buf->pos,
0417                        cl->buf->last - cl->buf->pos,
0418                        cl->buf->file_pos,
0419                        cl->buf->file_last - cl->buf->file_pos);
0420     }
0421 
0422     for (cl = p->free_raw_bufs; cl; cl = cl->next) {
0423         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
0424                        "pipe buf free s:%d t:%d f:%d "
0425                        "%p, pos %p, size: %z "
0426                        "file: %O, size: %O",
0427                        (cl->buf->shadow ? 1 : 0),
0428                        cl->buf->temporary, cl->buf->in_file,
0429                        cl->buf->start, cl->buf->pos,
0430                        cl->buf->last - cl->buf->pos,
0431                        cl->buf->file_pos,
0432                        cl->buf->file_last - cl->buf->file_pos);
0433     }
0434 
0435     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0436                    "pipe length: %O", p->length);
0437 
0438 #endif
0439 
0440     if (p->free_raw_bufs && p->length != -1) {
0441         cl = p->free_raw_bufs;
0442 
0443         if (cl->buf->last - cl->buf->pos >= p->length) {
0444 
0445             p->free_raw_bufs = cl->next;
0446 
0447             /* STUB */ cl->buf->num = p->num++;
0448 
0449             if (p->input_filter(p, cl->buf) == NGX_ERROR) {
0450                 return NGX_ABORT;
0451             }
0452 
0453             ngx_free_chain(p->pool, cl);
0454         }
0455     }
0456 
0457     if (p->length == 0) {
0458         p->upstream_done = 1;
0459         p->read = 1;
0460     }
0461 
0462     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
0463 
0464         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
0465 
0466         if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
0467             return NGX_ABORT;
0468         }
0469 
0470         p->free_raw_bufs = p->free_raw_bufs->next;
0471 
0472         if (p->free_bufs && p->buf_to_file == NULL) {
0473             for (cl = p->free_raw_bufs; cl; cl = cl->next) {
0474                 if (cl->buf->shadow == NULL) {
0475                     ngx_pfree(p->pool, cl->buf->start);
0476                 }
0477             }
0478         }
0479     }
0480 
0481     if (p->cacheable && (p->in || p->buf_to_file)) {
0482 
0483         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0484                        "pipe write chain");
0485 
0486         rc = ngx_event_pipe_write_chain_to_temp_file(p);
0487 
0488         if (rc != NGX_OK) {
0489             return rc;
0490         }
0491     }
0492 
0493     return NGX_OK;
0494 }
0495 
0496 
0497 static ngx_int_t
0498 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
0499 {
0500     u_char            *prev;
0501     size_t             bsize;
0502     ngx_int_t          rc;
0503     ngx_uint_t         flush, flushed, prev_last_shadow;
0504     ngx_chain_t       *out, **ll, *cl;
0505     ngx_connection_t  *downstream;
0506 
0507     downstream = p->downstream;
0508 
0509     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0510                    "pipe write downstream: %d", downstream->write->ready);
0511 
0512 #if (NGX_THREADS)
0513 
0514     if (p->writing) {
0515         rc = ngx_event_pipe_write_chain_to_temp_file(p);
0516 
0517         if (rc == NGX_ABORT) {
0518             return NGX_ABORT;
0519         }
0520     }
0521 
0522 #endif
0523 
0524     flushed = 0;
0525 
0526     for ( ;; ) {
0527         if (p->downstream_error) {
0528             return ngx_event_pipe_drain_chains(p);
0529         }
0530 
0531         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
0532 
0533             /* pass the p->out and p->in chains to the output filter */
0534 
0535             for (cl = p->busy; cl; cl = cl->next) {
0536                 cl->buf->recycled = 0;
0537             }
0538 
0539             if (p->out) {
0540                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0541                                "pipe write downstream flush out");
0542 
0543                 for (cl = p->out; cl; cl = cl->next) {
0544                     cl->buf->recycled = 0;
0545                 }
0546 
0547                 rc = p->output_filter(p->output_ctx, p->out);
0548 
0549                 if (rc == NGX_ERROR) {
0550                     p->downstream_error = 1;
0551                     return ngx_event_pipe_drain_chains(p);
0552                 }
0553 
0554                 p->out = NULL;
0555             }
0556 
0557             if (p->writing) {
0558                 break;
0559             }
0560 
0561             if (p->in) {
0562                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0563                                "pipe write downstream flush in");
0564 
0565                 for (cl = p->in; cl; cl = cl->next) {
0566                     cl->buf->recycled = 0;
0567                 }
0568 
0569                 rc = p->output_filter(p->output_ctx, p->in);
0570 
0571                 if (rc == NGX_ERROR) {
0572                     p->downstream_error = 1;
0573                     return ngx_event_pipe_drain_chains(p);
0574                 }
0575 
0576                 p->in = NULL;
0577             }
0578 
0579             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
0580                            "pipe write downstream done");
0581 
0582             /* TODO: free unused bufs */
0583 
0584             p->downstream_done = 1;
0585             break;
0586         }
0587 
0588         if (downstream->data != p->output_ctx
0589             || !downstream->write->ready
0590             || downstream->write->delayed)
0591         {
0592             break;
0593         }
0594 
0595         /* bsize is the size of the busy recycled bufs */
0596 
0597         prev = NULL;
0598         bsize = 0;
0599 
0600         for (cl = p->busy; cl; cl = cl->next) {
0601 
0602             if (cl->buf->recycled) {
0603                 if (prev == cl->buf->start) {
0604                     continue;
0605                 }
0606 
0607                 bsize += cl->buf->end - cl->buf->start;
0608                 prev = cl->buf->start;
0609             }
0610         }
0611 
0612         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0613                        "pipe write busy: %uz", bsize);
0614 
0615         out = NULL;
0616 
0617         if (bsize >= (size_t) p->busy_size) {
0618             flush = 1;
0619             goto flush;
0620         }
0621 
0622         flush = 0;
0623         ll = NULL;
0624         prev_last_shadow = 1;
0625 
0626         for ( ;; ) {
0627             if (p->out) {
0628                 cl = p->out;
0629 
0630                 if (cl->buf->recycled) {
0631                     ngx_log_error(NGX_LOG_ALERT, p->log, 0,
0632                                   "recycled buffer in pipe out chain");
0633                 }
0634 
0635                 p->out = p->out->next;
0636 
0637             } else if (!p->cacheable && !p->writing && p->in) {
0638                 cl = p->in;
0639 
0640                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
0641                                "pipe write buf ls:%d %p %z",
0642                                cl->buf->last_shadow,
0643                                cl->buf->pos,
0644                                cl->buf->last - cl->buf->pos);
0645 
0646                 if (cl->buf->recycled && prev_last_shadow) {
0647                     if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
0648                         flush = 1;
0649                         break;
0650                     }
0651 
0652                     bsize += cl->buf->end - cl->buf->start;
0653                 }
0654 
0655                 prev_last_shadow = cl->buf->last_shadow;
0656 
0657                 p->in = p->in->next;
0658 
0659             } else {
0660                 break;
0661             }
0662 
0663             cl->next = NULL;
0664 
0665             if (out) {
0666                 *ll = cl;
0667             } else {
0668                 out = cl;
0669             }
0670             ll = &cl->next;
0671         }
0672 
0673     flush:
0674 
0675         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
0676                        "pipe write: out:%p, f:%ui", out, flush);
0677 
0678         if (out == NULL) {
0679 
0680             if (!flush) {
0681                 break;
0682             }
0683 
0684             /* a workaround for AIO */
0685             if (flushed++ > 10) {
0686                 return NGX_BUSY;
0687             }
0688         }
0689 
0690         rc = p->output_filter(p->output_ctx, out);
0691 
0692         ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
0693 
0694         if (rc == NGX_ERROR) {
0695             p->downstream_error = 1;
0696             return ngx_event_pipe_drain_chains(p);
0697         }
0698 
0699         for (cl = p->free; cl; cl = cl->next) {
0700 
0701             if (cl->buf->temp_file) {
0702                 if (p->cacheable || !p->cyclic_temp_file) {
0703                     continue;
0704                 }
0705 
0706                 /* reset p->temp_offset if all bufs had been sent */
0707 
0708                 if (cl->buf->file_last == p->temp_file->offset) {
0709                     p->temp_file->offset = 0;
0710                 }
0711             }
0712 
0713             /* TODO: free buf if p->free_bufs && upstream done */
0714 
0715             /* add the free shadow raw buf to p->free_raw_bufs */
0716 
0717             if (cl->buf->last_shadow) {
0718                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
0719                     return NGX_ABORT;
0720                 }
0721 
0722                 cl->buf->last_shadow = 0;
0723             }
0724 
0725             cl->buf->shadow = NULL;
0726         }
0727     }
0728 
0729     return NGX_OK;
0730 }
0731 
0732 
0733 static ngx_int_t
0734 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
0735 {
0736     ssize_t       size, bsize, n;
0737     ngx_buf_t    *b;
0738     ngx_uint_t    prev_last_shadow;
0739     ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
0740 
0741 #if (NGX_THREADS)
0742 
0743     if (p->writing) {
0744 
0745         if (p->aio) {
0746             return NGX_AGAIN;
0747         }
0748 
0749         out = p->writing;
0750         p->writing = NULL;
0751 
0752         n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
0753 
0754         if (n == NGX_ERROR) {
0755             return NGX_ABORT;
0756         }
0757 
0758         goto done;
0759     }
0760 
0761 #endif
0762 
0763     if (p->buf_to_file) {
0764         out = ngx_alloc_chain_link(p->pool);
0765         if (out == NULL) {
0766             return NGX_ABORT;
0767         }
0768 
0769         out->buf = p->buf_to_file;
0770         out->next = p->in;
0771 
0772     } else {
0773         out = p->in;
0774     }
0775 
0776     if (!p->cacheable) {
0777 
0778         size = 0;
0779         cl = out;
0780         ll = NULL;
0781         prev_last_shadow = 1;
0782 
0783         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
0784                        "pipe offset: %O", p->temp_file->offset);
0785 
0786         do {
0787             bsize = cl->buf->last - cl->buf->pos;
0788 
0789             ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
0790                            "pipe buf ls:%d %p, pos %p, size: %z",
0791                            cl->buf->last_shadow, cl->buf->start,
0792                            cl->buf->pos, bsize);
0793 
0794             if (prev_last_shadow
0795                 && ((size + bsize > p->temp_file_write_size)
0796                     || (p->temp_file->offset + size + bsize
0797                         > p->max_temp_file_size)))
0798             {
0799                 break;
0800             }
0801 
0802             prev_last_shadow = cl->buf->last_shadow;
0803 
0804             size += bsize;
0805             ll = &cl->next;
0806             cl = cl->next;
0807 
0808         } while (cl);
0809 
0810         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
0811 
0812         if (ll == NULL) {
0813             return NGX_BUSY;
0814         }
0815 
0816         if (cl) {
0817             p->in = cl;
0818             *ll = NULL;
0819 
0820         } else {
0821             p->in = NULL;
0822             p->last_in = &p->in;
0823         }
0824 
0825     } else {
0826         p->in = NULL;
0827         p->last_in = &p->in;
0828     }
0829 
0830 #if (NGX_THREADS)
0831     if (p->thread_handler) {
0832         p->temp_file->thread_write = 1;
0833         p->temp_file->file.thread_task = p->thread_task;
0834         p->temp_file->file.thread_handler = p->thread_handler;
0835         p->temp_file->file.thread_ctx = p->thread_ctx;
0836     }
0837 #endif
0838 
0839     n = ngx_write_chain_to_temp_file(p->temp_file, out);
0840 
0841     if (n == NGX_ERROR) {
0842         return NGX_ABORT;
0843     }
0844 
0845 #if (NGX_THREADS)
0846 
0847     if (n == NGX_AGAIN) {
0848         p->writing = out;
0849         p->thread_task = p->temp_file->file.thread_task;
0850         return NGX_AGAIN;
0851     }
0852 
0853 done:
0854 
0855 #endif
0856 
0857     if (p->buf_to_file) {
0858         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
0859         n -= p->buf_to_file->last - p->buf_to_file->pos;
0860         p->buf_to_file = NULL;
0861         out = out->next;
0862     }
0863 
0864     if (n > 0) {
0865         /* update previous buffer or add new buffer */
0866 
0867         if (p->out) {
0868             for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
0869 
0870             b = cl->buf;
0871 
0872             if (b->file_last == p->temp_file->offset) {
0873                 p->temp_file->offset += n;
0874                 b->file_last = p->temp_file->offset;
0875                 goto free;
0876             }
0877 
0878             last_out = &cl->next;
0879 
0880         } else {
0881             last_out = &p->out;
0882         }
0883 
0884         cl = ngx_chain_get_free_buf(p->pool, &p->free);
0885         if (cl == NULL) {
0886             return NGX_ABORT;
0887         }
0888 
0889         b = cl->buf;
0890 
0891         ngx_memzero(b, sizeof(ngx_buf_t));
0892 
0893         b->tag = p->tag;
0894 
0895         b->file = &p->temp_file->file;
0896         b->file_pos = p->temp_file->offset;
0897         p->temp_file->offset += n;
0898         b->file_last = p->temp_file->offset;
0899 
0900         b->in_file = 1;
0901         b->temp_file = 1;
0902 
0903         *last_out = cl;
0904     }
0905 
0906 free:
0907 
0908     for (last_free = &p->free_raw_bufs;
0909          *last_free != NULL;
0910          last_free = &(*last_free)->next)
0911     {
0912         /* void */
0913     }
0914 
0915     for (cl = out; cl; cl = next) {
0916         next = cl->next;
0917 
0918         cl->next = p->free;
0919         p->free = cl;
0920 
0921         b = cl->buf;
0922 
0923         if (b->last_shadow) {
0924 
0925             tl = ngx_alloc_chain_link(p->pool);
0926             if (tl == NULL) {
0927                 return NGX_ABORT;
0928             }
0929 
0930             tl->buf = b->shadow;
0931             tl->next = NULL;
0932 
0933             *last_free = tl;
0934             last_free = &tl->next;
0935 
0936             b->shadow->pos = b->shadow->start;
0937             b->shadow->last = b->shadow->start;
0938 
0939             ngx_event_pipe_remove_shadow_links(b->shadow);
0940         }
0941     }
0942 
0943     return NGX_OK;
0944 }
0945 
0946 
0947 /* the copy input filter */
0948 
0949 ngx_int_t
0950 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
0951 {
0952     ngx_buf_t    *b;
0953     ngx_chain_t  *cl;
0954 
0955     if (buf->pos == buf->last) {
0956         return NGX_OK;
0957     }
0958 
0959     cl = ngx_chain_get_free_buf(p->pool, &p->free);
0960     if (cl == NULL) {
0961         return NGX_ERROR;
0962     }
0963 
0964     b = cl->buf;
0965 
0966     ngx_memcpy(b, buf, sizeof(ngx_buf_t));
0967     b->shadow = buf;
0968     b->tag = p->tag;
0969     b->last_shadow = 1;
0970     b->recycled = 1;
0971     buf->shadow = b;
0972 
0973     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
0974 
0975     if (p->in) {
0976         *p->last_in = cl;
0977     } else {
0978         p->in = cl;
0979     }
0980     p->last_in = &cl->next;
0981 
0982     if (p->length == -1) {
0983         return NGX_OK;
0984     }
0985 
0986     p->length -= b->last - b->pos;
0987 
0988     return NGX_OK;
0989 }
0990 
0991 
0992 static ngx_inline void
0993 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
0994 {
0995     ngx_buf_t  *b, *next;
0996 
0997     b = buf->shadow;
0998 
0999     if (b == NULL) {
1000         return;
1001     }
1002 
1003     while (!b->last_shadow) {
1004         next = b->shadow;
1005 
1006         b->temporary = 0;
1007         b->recycled = 0;
1008 
1009         b->shadow = NULL;
1010         b = next;
1011     }
1012 
1013     b->temporary = 0;
1014     b->recycled = 0;
1015     b->last_shadow = 0;
1016 
1017     b->shadow = NULL;
1018 
1019     buf->shadow = NULL;
1020 }
1021 
1022 
1023 ngx_int_t
1024 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
1025 {
1026     ngx_chain_t  *cl;
1027 
1028     cl = ngx_alloc_chain_link(p->pool);
1029     if (cl == NULL) {
1030         return NGX_ERROR;
1031     }
1032 
1033     if (p->buf_to_file && b->start == p->buf_to_file->start) {
1034         b->pos = p->buf_to_file->last;
1035         b->last = p->buf_to_file->last;
1036 
1037     } else {
1038         b->pos = b->start;
1039         b->last = b->start;
1040     }
1041 
1042     b->shadow = NULL;
1043 
1044     cl->buf = b;
1045 
1046     if (p->free_raw_bufs == NULL) {
1047         p->free_raw_bufs = cl;
1048         cl->next = NULL;
1049 
1050         return NGX_OK;
1051     }
1052 
1053     if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
1054 
1055         /* add the free buf to the list start */
1056 
1057         cl->next = p->free_raw_bufs;
1058         p->free_raw_bufs = cl;
1059 
1060         return NGX_OK;
1061     }
1062 
1063     /* the first free buf is partially filled, thus add the free buf after it */
1064 
1065     cl->next = p->free_raw_bufs->next;
1066     p->free_raw_bufs->next = cl;
1067 
1068     return NGX_OK;
1069 }
1070 
1071 
1072 static ngx_int_t
1073 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
1074 {
1075     ngx_chain_t  *cl, *tl;
1076 
1077     for ( ;; ) {
1078         if (p->busy) {
1079             cl = p->busy;
1080             p->busy = NULL;
1081 
1082         } else if (p->out) {
1083             cl = p->out;
1084             p->out = NULL;
1085 
1086         } else if (p->in) {
1087             cl = p->in;
1088             p->in = NULL;
1089 
1090         } else {
1091             return NGX_OK;
1092         }
1093 
1094         while (cl) {
1095             if (cl->buf->last_shadow) {
1096                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1097                     return NGX_ABORT;
1098                 }
1099 
1100                 cl->buf->last_shadow = 0;
1101             }
1102 
1103             cl->buf->shadow = NULL;
1104             tl = cl->next;
1105             cl->next = p->free;
1106             p->free = cl;
1107             cl = tl;
1108         }
1109     }
1110 }