~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
Nginx/event/ngx_event_pipe.c

Version: ~ [ nginx-0.8.20 ] ~ [ nginx-0.7.62 ] ~ [ nginx-0.6.39 ] ~

  1 
  2 /*
  3  * Copyright (C) Igor Sysoev
  4  */
  5 
  6 
  7 #include <ngx_config.h>
  8 #include <ngx_core.h>
  9 #include <ngx_event.h>
 10 #include <ngx_event_pipe.h>
 11 
 12 
 13 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
 14 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
 15 
 16 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
 17 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
 18 static ngx_inline void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
 19                                                           ngx_buf_t *buf);
 20 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
 21 
 22 
 23 ngx_int_t
 24 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
 25 {
 26     u_int         flags;
 27     ngx_int_t     rc;
 28     ngx_event_t  *rev, *wev;
 29 
 30     for ( ;; ) {
 31         if (do_write) {
 32             p->log->action = "sending to client";
 33 
 34             rc = ngx_event_pipe_write_to_downstream(p);
 35 
 36             if (rc == NGX_ABORT) {
 37                 return NGX_ABORT;
 38             }
 39 
 40             if (rc == NGX_BUSY) {
 41                 return NGX_OK;
 42             }
 43         }
 44 
 45         p->read = 0;
 46         p->upstream_blocked = 0;
 47 
 48         p->log->action = "reading upstream";
 49 
 50         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
 51             return NGX_ABORT;
 52         }
 53 
 54         if (!p->read && !p->upstream_blocked) {
 55             break;
 56         }
 57 
 58         do_write = 1;
 59     }
 60 
 61     if (p->upstream->fd != -1) {
 62         rev = p->upstream->read;
 63 
 64         flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
 65 
 66         if (ngx_handle_read_event(rev, flags) != NGX_OK) {
 67             return NGX_ABORT;
 68         }
 69 
 70         if (rev->active && !rev->ready) {
 71             ngx_add_timer(rev, p->read_timeout);
 72 
 73         } else if (rev->timer_set) {
 74             ngx_del_timer(rev);
 75         }
 76     }
 77 
 78     if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
 79         wev = p->downstream->write;
 80         if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
 81             return NGX_ABORT;
 82         }
 83 
 84         if (!wev->delayed) {
 85             if (wev->active && !wev->ready) {
 86                 ngx_add_timer(wev, p->send_timeout);
 87 
 88             } else if (wev->timer_set) {
 89                 ngx_del_timer(wev);
 90             }
 91         }
 92     }
 93 
 94     return NGX_OK;
 95 }
 96 
 97 
 98 static ngx_int_t
 99 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
100 {
101     ssize_t       n, size;
102     ngx_int_t     rc;
103     ngx_buf_t    *b;
104     ngx_chain_t  *chain, *cl, *ln;
105 
106     if (p->upstream_eof || p->upstream_error || p->upstream_done) {
107         return NGX_OK;
108     }
109 
110     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
111                    "pipe read upstream: %d", p->upstream->read->ready);
112 
113     for ( ;; ) {
114 
115         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
116             break;
117         }
118 
119         if (p->preread_bufs == NULL && !p->upstream->read->ready) {
120             break;
121         }
122 
123         if (p->preread_bufs) {
124 
125             /* use the pre-read bufs if they exist */
126 
127             chain = p->preread_bufs;
128             p->preread_bufs = NULL;
129             n = p->preread_size;
130 
131             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
132                            "pipe preread: %z", n);
133 
134             if (n) {
135                 p->read = 1;
136             }
137 
138         } else {
139 
140 #if (NGX_HAVE_KQUEUE)
141 
142             /*
143              * kqueue notifies about the end of file or a pending error.
144              * This test allows not to allocate a buf on these conditions
145              * and not to call c->recv_chain().
146              */
147 
148             if (p->upstream->read->available == 0
149                 && p->upstream->read->pending_eof)
150             {
151                 p->upstream->read->ready = 0;
152                 p->upstream->read->eof = 0;
153                 p->upstream_eof = 1;
154                 p->read = 1;
155 
156                 if (p->upstream->read->kq_errno) {
157                     p->upstream->read->error = 1;
158                     p->upstream_error = 1;
159                     p->upstream_eof = 0;
160 
161                     ngx_log_error(NGX_LOG_ERR, p->log,
162                                   p->upstream->read->kq_errno,
163                                   "kevent() reported that upstream "
164                                   "closed connection");
165                 }
166 
167                 break;
168             }
169 #endif
170 
171             if (p->free_raw_bufs) {
172 
173                 /* use the free bufs if they exist */
174 
175                 chain = p->free_raw_bufs;
176                 if (p->single_buf) {
177                     p->free_raw_bufs = p->free_raw_bufs->next;
178                     chain->next = NULL;
179                 } else {
180                     p->free_raw_bufs = NULL;
181                 }
182 
183             } else if (p->allocated < p->bufs.num) {
184 
185                 /* allocate a new buf if it's still allowed */
186 
187                 b = ngx_create_temp_buf(p->pool, p->bufs.size);
188                 if (b == NULL) {
189                     return NGX_ABORT;
190                 }
191 
192                 p->allocated++;
193 
194                 chain = ngx_alloc_chain_link(p->pool);
195                 if (chain == NULL) {
196                     return NGX_ABORT;
197                 }
198 
199                 chain->buf = b;
200                 chain->next = NULL;
201 
202             } else if (!p->cacheable
203                        && p->downstream->data == p->output_ctx
204                        && p->downstream->write->ready
205                        && !p->downstream->write->delayed)
206             {
207                 /*
208                  * if the bufs are not needed to be saved in a cache and
209                  * a downstream is ready then write the bufs to a downstream
210                  */
211 
212                 p->upstream_blocked = 1;
213 
214                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
215                                "pipe downstream ready");
216 
217                 break;
218 
219             } else if (p->cacheable
220                        || p->temp_file->offset < p->max_temp_file_size)
221             {
222 
223                 /*
224                  * if it is allowed, then save some bufs from r->in
225                  * to a temporary file, and add them to a r->out chain
226                  */
227 
228                 rc = ngx_event_pipe_write_chain_to_temp_file(p);
229 
230                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
231                                "pipe temp offset: %O", p->temp_file->offset);
232 
233                 if (rc == NGX_BUSY) {
234                     break;
235                 }
236 
237                 if (rc == NGX_AGAIN) {
238                     if (ngx_event_flags & NGX_USE_LEVEL_EVENT
239                         && p->upstream->read->active
240                         && p->upstream->read->ready)
241                     {
242                         if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
243                             == NGX_ERROR)
244                         {
245                             return NGX_ABORT;
246                         }
247                     }
248                 }
249 
250                 if (rc != NGX_OK) {
251                     return rc;
252                 }
253 
254                 chain = p->free_raw_bufs;
255                 if (p->single_buf) {
256                     p->free_raw_bufs = p->free_raw_bufs->next;
257                     chain->next = NULL;
258                 } else {
259                     p->free_raw_bufs = NULL;
260                 }
261 
262             } else {
263 
264                 /* there are no bufs to read in */
265 
266                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
267                                "no pipe bufs to read in");
268 
269                 break;
270             }
271 
272             n = p->upstream->recv_chain(p->upstream, chain);
273 
274             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
275                            "pipe recv chain: %z", n);
276 
277             if (p->free_raw_bufs) {
278                 chain->next = p->free_raw_bufs;
279             }
280             p->free_raw_bufs = chain;
281 
282             if (n == NGX_ERROR) {
283                 p->upstream_error = 1;
284                 return NGX_ERROR;
285             }
286 
287             if (n == NGX_AGAIN) {
288                 if (p->single_buf) {
289                     ngx_event_pipe_remove_shadow_links(chain->buf);
290                 }
291 
292                 break;
293             }
294 
295             p->read = 1;
296 
297             if (n == 0) {
298                 p->upstream_eof = 1;
299                 break;
300             }
301         }
302 
303         p->read_length += n;
304         cl = chain;
305         p->free_raw_bufs = NULL;
306 
307         while (cl && n > 0) {
308 
309             ngx_event_pipe_remove_shadow_links(cl->buf);
310 
311             size = cl->buf->end - cl->buf->last;
312 
313             if (n >= size) {
314                 cl->buf->last = cl->buf->end;
315 
316                 /* STUB */ cl->buf->num = p->num++;
317 
318                 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
319                     return NGX_ABORT;
320                 }
321 
322                 n -= size;
323                 ln = cl;
324                 cl = cl->next;
325                 ngx_free_chain(p->pool, ln);
326 
327             } else {
328                 cl->buf->last += n;
329                 n = 0;
330             }
331         }
332 
333         if (cl) {
334             for (ln = cl; ln->next; ln = ln->next) { /* void */ }
335 
336             ln->next = p->free_raw_bufs;
337             p->free_raw_bufs = cl;
338         }
339     }
340 
341 #if (NGX_DEBUG)
342 
343     for (cl = p->busy; cl; cl = cl->next) {
344         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
345                        "pipe buf busy s:%d t:%d f:%d "
346                        "%p, pos %p, size: %z "
347                        "file: %O, size: %z",
348                        (cl->buf->shadow ? 1 : 0),
349                        cl->buf->temporary, cl->buf->in_file,
350                        cl->buf->start, cl->buf->pos,
351                        cl->buf->last - cl->buf->pos,
352                        cl->buf->file_pos,
353                        cl->buf->file_last - cl->buf->file_pos);
354     }
355 
356     for (cl = p->out; cl; cl = cl->next) {
357         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
358                        "pipe buf out  s:%d t:%d f:%d "
359                        "%p, pos %p, size: %z "
360                        "file: %O, size: %z",
361                        (cl->buf->shadow ? 1 : 0),
362                        cl->buf->temporary, cl->buf->in_file,
363                        cl->buf->start, cl->buf->pos,
364                        cl->buf->last - cl->buf->pos,
365                        cl->buf->file_pos,
366                        cl->buf->file_last - cl->buf->file_pos);
367     }
368 
369     for (cl = p->in; cl; cl = cl->next) {
370         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
371                        "pipe buf in   s:%d t:%d f:%d "
372                        "%p, pos %p, size: %z "
373                        "file: %O, size: %z",
374                        (cl->buf->shadow ? 1 : 0),
375                        cl->buf->temporary, cl->buf->in_file,
376                        cl->buf->start, cl->buf->pos,
377                        cl->buf->last - cl->buf->pos,
378                        cl->buf->file_pos,
379                        cl->buf->file_last - cl->buf->file_pos);
380     }
381 
382     for (cl = p->free_raw_bufs; cl; cl = cl->next) {
383         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
384                        "pipe buf free s:%d t:%d f:%d "
385                        "%p, pos %p, size: %z "
386                        "file: %O, size: %z",
387                        (cl->buf->shadow ? 1 : 0),
388                        cl->buf->temporary, cl->buf->in_file,
389                        cl->buf->start, cl->buf->pos,
390                        cl->buf->last - cl->buf->pos,
391                        cl->buf->file_pos,
392                        cl->buf->file_last - cl->buf->file_pos);
393     }
394 
395 #endif
396 
397     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
398 
399         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
400 
401         if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
402             return NGX_ABORT;
403         }
404 
405         p->free_raw_bufs = p->free_raw_bufs->next;
406 
407         if (p->free_bufs && p->buf_to_file == NULL) {
408             for (cl = p->free_raw_bufs; cl; cl = cl->next) {
409                 if (cl->buf->shadow == NULL) {
410                     ngx_pfree(p->pool, cl->buf->start);
411                 }
412             }
413         }
414     }
415 
416     if (p->cacheable && p->in) {
417         if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
418             return NGX_ABORT;
419         }
420     }
421 
422     return NGX_OK;
423 }
424 
425 
426 static ngx_int_t
427 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
428 {
429     u_char            *prev;
430     size_t             bsize;
431     ngx_int_t          rc;
432     ngx_uint_t         flush, flushed, prev_last_shadow;
433     ngx_chain_t       *out, **ll, *cl, file;
434     ngx_connection_t  *downstream;
435 
436     downstream = p->downstream;
437 
438     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
439                    "pipe write downstream: %d", downstream->write->ready);
440 
441     flushed = 0;
442 
443     for ( ;; ) {
444         if (p->downstream_error) {
445             return ngx_event_pipe_drain_chains(p);
446         }
447 
448         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
449 
450             /* pass the p->out and p->in chains to the output filter */
451 
452             for (cl = p->busy; cl; cl = cl->next) {
453                 cl->buf->recycled = 0;
454             }
455 
456             if (p->out) {
457                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
458                                "pipe write downstream flush out");
459 
460                 for (cl = p->out; cl; cl = cl->next) {
461                     cl->buf->recycled = 0;
462                 }
463 
464                 rc = p->output_filter(p->output_ctx, p->out);
465 
466                 if (rc == NGX_ERROR) {
467                     p->downstream_error = 1;
468                     return ngx_event_pipe_drain_chains(p);
469                 }
470 
471                 p->out = NULL;
472             }
473 
474             if (p->in) {
475                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
476                                "pipe write downstream flush in");
477 
478                 for (cl = p->in; cl; cl = cl->next) {
479                     cl->buf->recycled = 0;
480                 }
481 
482                 rc = p->output_filter(p->output_ctx, p->in);
483 
484                 if (rc == NGX_ERROR) {
485                     p->downstream_error = 1;
486                     return ngx_event_pipe_drain_chains(p);
487                 }
488 
489                 p->in = NULL;
490             }
491 
492             if (p->cacheable && p->buf_to_file) {
493 
494                 file.buf = p->buf_to_file;
495                 file.next = NULL;
496 
497                 if (ngx_write_chain_to_temp_file(p->temp_file, &file)
498                     == NGX_ERROR)
499                 {
500                     return NGX_ABORT;
501                 }
502             }
503 
504             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
505                            "pipe write downstream done");
506 
507             /* TODO: free unused bufs */
508 
509             p->downstream_done = 1;
510             break;
511         }
512 
513         if (downstream->data != p->output_ctx
514             || !downstream->write->ready
515             || downstream->write->delayed)
516         {
517             break;
518         }
519 
520         /* bsize is the size of the busy recycled bufs */
521 
522         prev = NULL;
523         bsize = 0;
524 
525         for (cl = p->busy; cl; cl = cl->next) {
526 
527             if (cl->buf->recycled) {
528                 if (prev == cl->buf->start) {
529                     continue;
530                 }
531 
532                 bsize += cl->buf->end - cl->buf->start;
533                 prev = cl->buf->start;
534             }
535         }
536 
537         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
538                        "pipe write busy: %uz", bsize);
539 
540         out = NULL;
541 
542         if (bsize >= (size_t) p->busy_size) {
543             flush = 1;
544             goto flush;
545         }
546 
547         flush = 0;
548         ll = NULL;
549         prev_last_shadow = 1;
550 
551         for ( ;; ) {
552             if (p->out) {
553                 cl = p->out;
554 
555                 if (cl->buf->recycled
556                     && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
557                 {
558                     flush = 1;
559                     break;
560                 }
561 
562                 p->out = p->out->next;
563 
564                 ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);
565 
566             } else if (!p->cacheable && p->in) {
567                 cl = p->in;
568 
569                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
570                                "pipe write buf ls:%d %p %z",
571                                cl->buf->last_shadow,
572                                cl->buf->pos,
573                                cl->buf->last - cl->buf->pos);
574 
575                 if (cl->buf->recycled
576                     && cl->buf->last_shadow
577                     && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
578                 {
579                     if (!prev_last_shadow) {
580                         p->in = p->in->next;
581 
582                         cl->next = NULL;
583 
584                         if (out) {
585                             *ll = cl;
586                         } else {
587                             out = cl;
588                         }
589                     }
590 
591                     flush = 1;
592                     break;
593                 }
594 
595                 prev_last_shadow = cl->buf->last_shadow;
596 
597                 p->in = p->in->next;
598 
599             } else {
600                 break;
601             }
602 
603             if (cl->buf->recycled) {
604                 bsize += cl->buf->last - cl->buf->pos;
605             }
606 
607             cl->next = NULL;
608 
609             if (out) {
610                 *ll = cl;
611             } else {
612                 out = cl;
613             }
614             ll = &cl->next;
615         }
616 
617     flush:
618 
619         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
620                        "pipe write: out:%p, f:%d", out, flush);
621 
622         if (out == NULL) {
623 
624             if (!flush) {
625                 break;
626             }
627 
628             /* a workaround for AIO */
629             if (flushed++ > 10) {
630                 return NGX_BUSY;
631             }
632         }
633 
634         rc = p->output_filter(p->output_ctx, out);
635 
636         if (rc == NGX_ERROR) {
637             p->downstream_error = 1;
638             return ngx_event_pipe_drain_chains(p);
639         }
640 
641         ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
642 
643         for (cl = p->free; cl; cl = cl->next) {
644 
645             if (cl->buf->temp_file) {
646                 if (p->cacheable || !p->cyclic_temp_file) {
647                     continue;
648                 }
649 
650                 /* reset p->temp_offset if all bufs had been sent */
651 
652                 if (cl->buf->file_last == p->temp_file->offset) {
653                     p->temp_file->offset = 0;
654                 }
655             }
656 
657             /* TODO: free buf if p->free_bufs && upstream done */
658 
659             /* add the free shadow raw buf to p->free_raw_bufs */
660 
661             if (cl->buf->last_shadow) {
662                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
663                     return NGX_ABORT;
664                 }
665 
666                 cl->buf->last_shadow = 0;
667             }
668 
669             cl->buf->shadow = NULL;
670         }
671     }
672 
673     return NGX_OK;
674 }
675 
676 
677 static ngx_int_t
678 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
679 {
680     ssize_t       size, bsize;
681     ngx_buf_t    *b;
682     ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_free, fl;
683 
684     if (p->buf_to_file) {
685         fl.buf = p->buf_to_file;
686         fl.next = p->in;
687         out = &fl;
688 
689     } else {
690         out = p->in;
691     }
692 
693     if (!p->cacheable) {
694 
695         size = 0;
696         cl = out;
697         ll = NULL;
698 
699         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
700                        "pipe offset: %O", p->temp_file->offset);
701 
702         do {
703             bsize = cl->buf->last - cl->buf->pos;
704 
705             ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
706                            "pipe buf %p, pos %p, size: %z",
707                            cl->buf->start, cl->buf->pos, bsize);
708 
709             if ((size + bsize > p->temp_file_write_size)
710                || (p->temp_file->offset + size + bsize > p->max_temp_file_size))
711             {
712                 break;
713             }
714 
715             size += bsize;
716             ll = &cl->next;
717             cl = cl->next;
718 
719         } while (cl);
720 
721         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
722 
723         if (ll == NULL) {
724             return NGX_BUSY;
725         }
726 
727         if (cl) {
728            p->in = cl;
729            *ll = NULL;
730 
731         } else {
732            p->in = NULL;
733            p->last_in = &p->in;
734         }
735 
736     } else {
737         p->in = NULL;
738         p->last_in = &p->in;
739     }
740 
741     if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
742         return NGX_ABORT;
743     }
744 
745     for (last_free = &p->free_raw_bufs;
746          *last_free != NULL;
747          last_free = &(*last_free)->next)
748     {
749         /* void */
750     }
751 
752     if (p->buf_to_file) {
753         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
754         p->buf_to_file = NULL;
755         out = out->next;
756     }
757 
758     for (cl = out; cl; cl = next) {
759         next = cl->next;
760         cl->next = NULL;
761 
762         b = cl->buf;
763         b->file = &p->temp_file->file;
764         b->file_pos = p->temp_file->offset;
765         p->temp_file->offset += b->last - b->pos;
766         b->file_last = p->temp_file->offset;
767 
768         b->in_file = 1;
769         b->temp_file = 1;
770 
771         if (p->out) {
772             *p->last_out = cl;
773         } else {
774             p->out = cl;
775         }
776         p->last_out = &cl->next;
777 
778         if (b->last_shadow) {
779 
780             tl = ngx_alloc_chain_link(p->pool);
781             if (tl == NULL) {
782                 return NGX_ABORT;
783             }
784 
785             tl->buf = b->shadow;
786             tl->next = NULL;
787 
788             *last_free = tl;
789             last_free = &tl->next;
790 
791             b->shadow->pos = b->shadow->start;
792             b->shadow->last = b->shadow->start;
793 
794             ngx_event_pipe_remove_shadow_links(b->shadow);
795         }
796     }
797 
798     return NGX_OK;
799 }
800 
801 
802 /* the copy input filter */
803 
804 ngx_int_t
805 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
806 {
807     ngx_buf_t    *b;
808     ngx_chain_t  *cl;
809 
810     if (buf->pos == buf->last) {
811         return NGX_OK;
812     }
813 
814     if (p->free) {
815         cl = p->free;
816         b = cl->buf;
817         p->free = cl->next;
818         ngx_free_chain(p->pool, cl);
819 
820     } else {
821         b = ngx_alloc_buf(p->pool);
822         if (b == NULL) {
823             return NGX_ERROR;
824         }
825     }
826 
827     ngx_memcpy(b, buf, sizeof(ngx_buf_t));
828     b->shadow = buf;
829     b->tag = p->tag;
830     b->last_shadow = 1;
831     b->recycled = 1;
832     buf->shadow = b;
833 
834     cl = ngx_alloc_chain_link(p->pool);
835     if (cl == NULL) {
836         return NGX_ERROR;
837     }
838 
839     cl->buf = b;
840     cl->next = NULL;
841 
842     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
843 
844     if (p->in) {
845         *p->last_in = cl;
846     } else {
847         p->in = cl;
848     }
849     p->last_in = &cl->next;
850 
851     return NGX_OK;
852 }
853 
854 
855 static ngx_inline void
856 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
857 {
858     ngx_buf_t  *b, *next;
859 
860     b = buf->shadow;
861 
862     if (b == NULL) {
863         return;
864     }
865 
866     while (!b->last_shadow) {
867         next = b->shadow;
868 
869         b->temporary = 0;
870         b->recycled = 0;
871 
872         b->shadow = NULL;
873         b = next;
874     }
875 
876     b->temporary = 0;
877     b->recycled = 0;
878     b->last_shadow = 0;
879 
880     b->shadow = NULL;
881 
882     buf->shadow = NULL;
883 }
884 
885 
886 static ngx_inline void
887 ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free, ngx_buf_t *buf)
888 {
889     ngx_buf_t    *s;
890     ngx_chain_t  *cl, **ll;
891 
892     if (buf->shadow == NULL) {
893         return;
894     }
895 
896     for (s = buf->shadow; !s->last_shadow; s = s->shadow) { /* void */ }
897 
898     ll = free;
899 
900     for (cl = *free; cl; cl = cl->next) {
901         if (cl->buf == s) {
902             *ll = cl->next;
903             break;
904         }
905 
906         if (cl->buf->shadow) {
907             break;
908         }
909 
910         ll = &cl->next;
911     }
912 }
913 
914 
915 ngx_int_t
916 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
917 {
918     ngx_chain_t  *cl;
919 
920     cl = ngx_alloc_chain_link(p->pool);
921     if (cl == NULL) {
922         return NGX_ERROR;
923     }
924 
925     b->pos = b->start;
926     b->last = b->start;
927     b->shadow = NULL;
928 
929     cl->buf = b;
930 
931     if (p->free_raw_bufs == NULL) {
932         p->free_raw_bufs = cl;
933         cl->next = NULL;
934 
935         return NGX_OK;
936     }
937 
938     if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
939 
940         /* add the free buf to the list start */
941 
942         cl->next = p->free_raw_bufs;
943         p->free_raw_bufs = cl;
944 
945         return NGX_OK;
946     }
947 
948     /* the first free buf is partialy filled, thus add the free buf after it */
949 
950     cl->next = p->free_raw_bufs->next;
951     p->free_raw_bufs->next = cl;
952 
953     return NGX_OK;
954 }
955 
956 
957 static ngx_int_t
958 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
959 {
960     ngx_chain_t  *cl, *tl;
961 
962     for ( ;; ) {
963         if (p->busy) {
964             cl = p->busy;
965             p->busy = NULL;
966 
967         } else if (p->out) {
968             cl = p->out;
969             p->out = NULL;
970 
971         } else if (p->in) {
972             cl = p->in;
973             p->in = NULL;
974 
975         } else {
976             return NGX_OK;
977         }
978 
979         while (cl) {
980             if (cl->buf->last_shadow) {
981                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
982                     return NGX_ABORT;
983                 }
984 
985                 cl->buf->last_shadow = 0;
986             }
987 
988             cl->buf->shadow = NULL;
989             tl = cl->next;
990             cl->next = p->free;
991             p->free = cl;
992             cl = tl;
993         }
994     }
995 }
996 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.