1
2 /*
3 * Copyright (C) Igor Sysoev
4 */
5
6
7 #include <ngx_config.h>
8 #include <ngx_core.h>
9 #include <ngx_event.h>
10 #include <ngx_event_pipe.h>
11
12
13 static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
14 static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
15
16 static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
17 static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
18 static ngx_inline void ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free,
19 ngx_buf_t *buf);
20 static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
21
22
23 ngx_int_t
24 ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
25 {
26 u_int flags;
27 ngx_int_t rc;
28 ngx_event_t *rev, *wev;
29
30 for ( ;; ) {
31 if (do_write) {
32 p->log->action = "sending to client";
33
34 rc = ngx_event_pipe_write_to_downstream(p);
35
36 if (rc == NGX_ABORT) {
37 return NGX_ABORT;
38 }
39
40 if (rc == NGX_BUSY) {
41 return NGX_OK;
42 }
43 }
44
45 p->read = 0;
46 p->upstream_blocked = 0;
47
48 p->log->action = "reading upstream";
49
50 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
51 return NGX_ABORT;
52 }
53
54 if (!p->read && !p->upstream_blocked) {
55 break;
56 }
57
58 do_write = 1;
59 }
60
61 if (p->upstream->fd != -1) {
62 rev = p->upstream->read;
63
64 flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
65
66 if (ngx_handle_read_event(rev, flags) != NGX_OK) {
67 return NGX_ABORT;
68 }
69
70 if (rev->active && !rev->ready) {
71 ngx_add_timer(rev, p->read_timeout);
72
73 } else if (rev->timer_set) {
74 ngx_del_timer(rev);
75 }
76 }
77
78 if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
79 wev = p->downstream->write;
80 if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
81 return NGX_ABORT;
82 }
83
84 if (!wev->delayed) {
85 if (wev->active && !wev->ready) {
86 ngx_add_timer(wev, p->send_timeout);
87
88 } else if (wev->timer_set) {
89 ngx_del_timer(wev);
90 }
91 }
92 }
93
94 return NGX_OK;
95 }
96
97
98 static ngx_int_t
99 ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
100 {
101 ssize_t n, size;
102 ngx_int_t rc;
103 ngx_buf_t *b;
104 ngx_chain_t *chain, *cl, *ln;
105
106 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
107 return NGX_OK;
108 }
109
110 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
111 "pipe read upstream: %d", p->upstream->read->ready);
112
113 for ( ;; ) {
114
115 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
116 break;
117 }
118
119 if (p->preread_bufs == NULL && !p->upstream->read->ready) {
120 break;
121 }
122
123 if (p->preread_bufs) {
124
125 /* use the pre-read bufs if they exist */
126
127 chain = p->preread_bufs;
128 p->preread_bufs = NULL;
129 n = p->preread_size;
130
131 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
132 "pipe preread: %z", n);
133
134 if (n) {
135 p->read = 1;
136 }
137
138 } else {
139
140 #if (NGX_HAVE_KQUEUE)
141
142 /*
143 * kqueue notifies about the end of file or a pending error.
144 * This test allows not to allocate a buf on these conditions
145 * and not to call c->recv_chain().
146 */
147
148 if (p->upstream->read->available == 0
149 && p->upstream->read->pending_eof)
150 {
151 p->upstream->read->ready = 0;
152 p->upstream->read->eof = 0;
153 p->upstream_eof = 1;
154 p->read = 1;
155
156 if (p->upstream->read->kq_errno) {
157 p->upstream->read->error = 1;
158 p->upstream_error = 1;
159 p->upstream_eof = 0;
160
161 ngx_log_error(NGX_LOG_ERR, p->log,
162 p->upstream->read->kq_errno,
163 "kevent() reported that upstream "
164 "closed connection");
165 }
166
167 break;
168 }
169 #endif
170
171 if (p->free_raw_bufs) {
172
173 /* use the free bufs if they exist */
174
175 chain = p->free_raw_bufs;
176 if (p->single_buf) {
177 p->free_raw_bufs = p->free_raw_bufs->next;
178 chain->next = NULL;
179 } else {
180 p->free_raw_bufs = NULL;
181 }
182
183 } else if (p->allocated < p->bufs.num) {
184
185 /* allocate a new buf if it's still allowed */
186
187 b = ngx_create_temp_buf(p->pool, p->bufs.size);
188 if (b == NULL) {
189 return NGX_ABORT;
190 }
191
192 p->allocated++;
193
194 chain = ngx_alloc_chain_link(p->pool);
195 if (chain == NULL) {
196 return NGX_ABORT;
197 }
198
199 chain->buf = b;
200 chain->next = NULL;
201
202 } else if (!p->cacheable
203 && p->downstream->data == p->output_ctx
204 && p->downstream->write->ready
205 && !p->downstream->write->delayed)
206 {
207 /*
208 * if the bufs are not needed to be saved in a cache and
209 * a downstream is ready then write the bufs to a downstream
210 */
211
212 p->upstream_blocked = 1;
213
214 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
215 "pipe downstream ready");
216
217 break;
218
219 } else if (p->cacheable
220 || p->temp_file->offset < p->max_temp_file_size)
221 {
222
223 /*
224 * if it is allowed, then save some bufs from r->in
225 * to a temporary file, and add them to a r->out chain
226 */
227
228 rc = ngx_event_pipe_write_chain_to_temp_file(p);
229
230 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
231 "pipe temp offset: %O", p->temp_file->offset);
232
233 if (rc == NGX_BUSY) {
234 break;
235 }
236
237 if (rc == NGX_AGAIN) {
238 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
239 && p->upstream->read->active
240 && p->upstream->read->ready)
241 {
242 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
243 == NGX_ERROR)
244 {
245 return NGX_ABORT;
246 }
247 }
248 }
249
250 if (rc != NGX_OK) {
251 return rc;
252 }
253
254 chain = p->free_raw_bufs;
255 if (p->single_buf) {
256 p->free_raw_bufs = p->free_raw_bufs->next;
257 chain->next = NULL;
258 } else {
259 p->free_raw_bufs = NULL;
260 }
261
262 } else {
263
264 /* there are no bufs to read in */
265
266 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
267 "no pipe bufs to read in");
268
269 break;
270 }
271
272 n = p->upstream->recv_chain(p->upstream, chain);
273
274 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
275 "pipe recv chain: %z", n);
276
277 if (p->free_raw_bufs) {
278 chain->next = p->free_raw_bufs;
279 }
280 p->free_raw_bufs = chain;
281
282 if (n == NGX_ERROR) {
283 p->upstream_error = 1;
284 return NGX_ERROR;
285 }
286
287 if (n == NGX_AGAIN) {
288 if (p->single_buf) {
289 ngx_event_pipe_remove_shadow_links(chain->buf);
290 }
291
292 break;
293 }
294
295 p->read = 1;
296
297 if (n == 0) {
298 p->upstream_eof = 1;
299 break;
300 }
301 }
302
303 p->read_length += n;
304 cl = chain;
305 p->free_raw_bufs = NULL;
306
307 while (cl && n > 0) {
308
309 ngx_event_pipe_remove_shadow_links(cl->buf);
310
311 size = cl->buf->end - cl->buf->last;
312
313 if (n >= size) {
314 cl->buf->last = cl->buf->end;
315
316 /* STUB */ cl->buf->num = p->num++;
317
318 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
319 return NGX_ABORT;
320 }
321
322 n -= size;
323 ln = cl;
324 cl = cl->next;
325 ngx_free_chain(p->pool, ln);
326
327 } else {
328 cl->buf->last += n;
329 n = 0;
330 }
331 }
332
333 if (cl) {
334 for (ln = cl; ln->next; ln = ln->next) { /* void */ }
335
336 ln->next = p->free_raw_bufs;
337 p->free_raw_bufs = cl;
338 }
339 }
340
341 #if (NGX_DEBUG)
342
343 for (cl = p->busy; cl; cl = cl->next) {
344 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
345 "pipe buf busy s:%d t:%d f:%d "
346 "%p, pos %p, size: %z "
347 "file: %O, size: %z",
348 (cl->buf->shadow ? 1 : 0),
349 cl->buf->temporary, cl->buf->in_file,
350 cl->buf->start, cl->buf->pos,
351 cl->buf->last - cl->buf->pos,
352 cl->buf->file_pos,
353 cl->buf->file_last - cl->buf->file_pos);
354 }
355
356 for (cl = p->out; cl; cl = cl->next) {
357 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
358 "pipe buf out s:%d t:%d f:%d "
359 "%p, pos %p, size: %z "
360 "file: %O, size: %z",
361 (cl->buf->shadow ? 1 : 0),
362 cl->buf->temporary, cl->buf->in_file,
363 cl->buf->start, cl->buf->pos,
364 cl->buf->last - cl->buf->pos,
365 cl->buf->file_pos,
366 cl->buf->file_last - cl->buf->file_pos);
367 }
368
369 for (cl = p->in; cl; cl = cl->next) {
370 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
371 "pipe buf in s:%d t:%d f:%d "
372 "%p, pos %p, size: %z "
373 "file: %O, size: %z",
374 (cl->buf->shadow ? 1 : 0),
375 cl->buf->temporary, cl->buf->in_file,
376 cl->buf->start, cl->buf->pos,
377 cl->buf->last - cl->buf->pos,
378 cl->buf->file_pos,
379 cl->buf->file_last - cl->buf->file_pos);
380 }
381
382 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
383 ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
384 "pipe buf free s:%d t:%d f:%d "
385 "%p, pos %p, size: %z "
386 "file: %O, size: %z",
387 (cl->buf->shadow ? 1 : 0),
388 cl->buf->temporary, cl->buf->in_file,
389 cl->buf->start, cl->buf->pos,
390 cl->buf->last - cl->buf->pos,
391 cl->buf->file_pos,
392 cl->buf->file_last - cl->buf->file_pos);
393 }
394
395 #endif
396
397 if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
398
399 /* STUB */ p->free_raw_bufs->buf->num = p->num++;
400
401 if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
402 return NGX_ABORT;
403 }
404
405 p->free_raw_bufs = p->free_raw_bufs->next;
406
407 if (p->free_bufs && p->buf_to_file == NULL) {
408 for (cl = p->free_raw_bufs; cl; cl = cl->next) {
409 if (cl->buf->shadow == NULL) {
410 ngx_pfree(p->pool, cl->buf->start);
411 }
412 }
413 }
414 }
415
416 if (p->cacheable && p->in) {
417 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
418 return NGX_ABORT;
419 }
420 }
421
422 return NGX_OK;
423 }
424
425
426 static ngx_int_t
427 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
428 {
429 u_char *prev;
430 size_t bsize;
431 ngx_int_t rc;
432 ngx_uint_t flush, flushed, prev_last_shadow;
433 ngx_chain_t *out, **ll, *cl, file;
434 ngx_connection_t *downstream;
435
436 downstream = p->downstream;
437
438 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
439 "pipe write downstream: %d", downstream->write->ready);
440
441 flushed = 0;
442
443 for ( ;; ) {
444 if (p->downstream_error) {
445 return ngx_event_pipe_drain_chains(p);
446 }
447
448 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
449
450 /* pass the p->out and p->in chains to the output filter */
451
452 for (cl = p->busy; cl; cl = cl->next) {
453 cl->buf->recycled = 0;
454 }
455
456 if (p->out) {
457 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
458 "pipe write downstream flush out");
459
460 for (cl = p->out; cl; cl = cl->next) {
461 cl->buf->recycled = 0;
462 }
463
464 rc = p->output_filter(p->output_ctx, p->out);
465
466 if (rc == NGX_ERROR) {
467 p->downstream_error = 1;
468 return ngx_event_pipe_drain_chains(p);
469 }
470
471 p->out = NULL;
472 }
473
474 if (p->in) {
475 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
476 "pipe write downstream flush in");
477
478 for (cl = p->in; cl; cl = cl->next) {
479 cl->buf->recycled = 0;
480 }
481
482 rc = p->output_filter(p->output_ctx, p->in);
483
484 if (rc == NGX_ERROR) {
485 p->downstream_error = 1;
486 return ngx_event_pipe_drain_chains(p);
487 }
488
489 p->in = NULL;
490 }
491
492 if (p->cacheable && p->buf_to_file) {
493
494 file.buf = p->buf_to_file;
495 file.next = NULL;
496
497 if (ngx_write_chain_to_temp_file(p->temp_file, &file)
498 == NGX_ERROR)
499 {
500 return NGX_ABORT;
501 }
502 }
503
504 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
505 "pipe write downstream done");
506
507 /* TODO: free unused bufs */
508
509 p->downstream_done = 1;
510 break;
511 }
512
513 if (downstream->data != p->output_ctx
514 || !downstream->write->ready
515 || downstream->write->delayed)
516 {
517 break;
518 }
519
520 /* bsize is the size of the busy recycled bufs */
521
522 prev = NULL;
523 bsize = 0;
524
525 for (cl = p->busy; cl; cl = cl->next) {
526
527 if (cl->buf->recycled) {
528 if (prev == cl->buf->start) {
529 continue;
530 }
531
532 bsize += cl->buf->end - cl->buf->start;
533 prev = cl->buf->start;
534 }
535 }
536
537 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
538 "pipe write busy: %uz", bsize);
539
540 out = NULL;
541
542 if (bsize >= (size_t) p->busy_size) {
543 flush = 1;
544 goto flush;
545 }
546
547 flush = 0;
548 ll = NULL;
549 prev_last_shadow = 1;
550
551 for ( ;; ) {
552 if (p->out) {
553 cl = p->out;
554
555 if (cl->buf->recycled
556 && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
557 {
558 flush = 1;
559 break;
560 }
561
562 p->out = p->out->next;
563
564 ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);
565
566 } else if (!p->cacheable && p->in) {
567 cl = p->in;
568
569 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
570 "pipe write buf ls:%d %p %z",
571 cl->buf->last_shadow,
572 cl->buf->pos,
573 cl->buf->last - cl->buf->pos);
574
575 if (cl->buf->recycled
576 && cl->buf->last_shadow
577 && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
578 {
579 if (!prev_last_shadow) {
580 p->in = p->in->next;
581
582 cl->next = NULL;
583
584 if (out) {
585 *ll = cl;
586 } else {
587 out = cl;
588 }
589 }
590
591 flush = 1;
592 break;
593 }
594
595 prev_last_shadow = cl->buf->last_shadow;
596
597 p->in = p->in->next;
598
599 } else {
600 break;
601 }
602
603 if (cl->buf->recycled) {
604 bsize += cl->buf->last - cl->buf->pos;
605 }
606
607 cl->next = NULL;
608
609 if (out) {
610 *ll = cl;
611 } else {
612 out = cl;
613 }
614 ll = &cl->next;
615 }
616
617 flush:
618
619 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
620 "pipe write: out:%p, f:%d", out, flush);
621
622 if (out == NULL) {
623
624 if (!flush) {
625 break;
626 }
627
628 /* a workaround for AIO */
629 if (flushed++ > 10) {
630 return NGX_BUSY;
631 }
632 }
633
634 rc = p->output_filter(p->output_ctx, out);
635
636 if (rc == NGX_ERROR) {
637 p->downstream_error = 1;
638 return ngx_event_pipe_drain_chains(p);
639 }
640
641 ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
642
643 for (cl = p->free; cl; cl = cl->next) {
644
645 if (cl->buf->temp_file) {
646 if (p->cacheable || !p->cyclic_temp_file) {
647 continue;
648 }
649
650 /* reset p->temp_offset if all bufs had been sent */
651
652 if (cl->buf->file_last == p->temp_file->offset) {
653 p->temp_file->offset = 0;
654 }
655 }
656
657 /* TODO: free buf if p->free_bufs && upstream done */
658
659 /* add the free shadow raw buf to p->free_raw_bufs */
660
661 if (cl->buf->last_shadow) {
662 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
663 return NGX_ABORT;
664 }
665
666 cl->buf->last_shadow = 0;
667 }
668
669 cl->buf->shadow = NULL;
670 }
671 }
672
673 return NGX_OK;
674 }
675
676
677 static ngx_int_t
678 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
679 {
680 ssize_t size, bsize;
681 ngx_buf_t *b;
682 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_free, fl;
683
684 if (p->buf_to_file) {
685 fl.buf = p->buf_to_file;
686 fl.next = p->in;
687 out = &fl;
688
689 } else {
690 out = p->in;
691 }
692
693 if (!p->cacheable) {
694
695 size = 0;
696 cl = out;
697 ll = NULL;
698
699 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
700 "pipe offset: %O", p->temp_file->offset);
701
702 do {
703 bsize = cl->buf->last - cl->buf->pos;
704
705 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
706 "pipe buf %p, pos %p, size: %z",
707 cl->buf->start, cl->buf->pos, bsize);
708
709 if ((size + bsize > p->temp_file_write_size)
710 || (p->temp_file->offset + size + bsize > p->max_temp_file_size))
711 {
712 break;
713 }
714
715 size += bsize;
716 ll = &cl->next;
717 cl = cl->next;
718
719 } while (cl);
720
721 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
722
723 if (ll == NULL) {
724 return NGX_BUSY;
725 }
726
727 if (cl) {
728 p->in = cl;
729 *ll = NULL;
730
731 } else {
732 p->in = NULL;
733 p->last_in = &p->in;
734 }
735
736 } else {
737 p->in = NULL;
738 p->last_in = &p->in;
739 }
740
741 if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
742 return NGX_ABORT;
743 }
744
745 for (last_free = &p->free_raw_bufs;
746 *last_free != NULL;
747 last_free = &(*last_free)->next)
748 {
749 /* void */
750 }
751
752 if (p->buf_to_file) {
753 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
754 p->buf_to_file = NULL;
755 out = out->next;
756 }
757
758 for (cl = out; cl; cl = next) {
759 next = cl->next;
760 cl->next = NULL;
761
762 b = cl->buf;
763 b->file = &p->temp_file->file;
764 b->file_pos = p->temp_file->offset;
765 p->temp_file->offset += b->last - b->pos;
766 b->file_last = p->temp_file->offset;
767
768 b->in_file = 1;
769 b->temp_file = 1;
770
771 if (p->out) {
772 *p->last_out = cl;
773 } else {
774 p->out = cl;
775 }
776 p->last_out = &cl->next;
777
778 if (b->last_shadow) {
779
780 tl = ngx_alloc_chain_link(p->pool);
781 if (tl == NULL) {
782 return NGX_ABORT;
783 }
784
785 tl->buf = b->shadow;
786 tl->next = NULL;
787
788 *last_free = tl;
789 last_free = &tl->next;
790
791 b->shadow->pos = b->shadow->start;
792 b->shadow->last = b->shadow->start;
793
794 ngx_event_pipe_remove_shadow_links(b->shadow);
795 }
796 }
797
798 return NGX_OK;
799 }
800
801
802 /* the copy input filter */
803
804 ngx_int_t
805 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
806 {
807 ngx_buf_t *b;
808 ngx_chain_t *cl;
809
810 if (buf->pos == buf->last) {
811 return NGX_OK;
812 }
813
814 if (p->free) {
815 cl = p->free;
816 b = cl->buf;
817 p->free = cl->next;
818 ngx_free_chain(p->pool, cl);
819
820 } else {
821 b = ngx_alloc_buf(p->pool);
822 if (b == NULL) {
823 return NGX_ERROR;
824 }
825 }
826
827 ngx_memcpy(b, buf, sizeof(ngx_buf_t));
828 b->shadow = buf;
829 b->tag = p->tag;
830 b->last_shadow = 1;
831 b->recycled = 1;
832 buf->shadow = b;
833
834 cl = ngx_alloc_chain_link(p->pool);
835 if (cl == NULL) {
836 return NGX_ERROR;
837 }
838
839 cl->buf = b;
840 cl->next = NULL;
841
842 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
843
844 if (p->in) {
845 *p->last_in = cl;
846 } else {
847 p->in = cl;
848 }
849 p->last_in = &cl->next;
850
851 return NGX_OK;
852 }
853
854
855 static ngx_inline void
856 ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
857 {
858 ngx_buf_t *b, *next;
859
860 b = buf->shadow;
861
862 if (b == NULL) {
863 return;
864 }
865
866 while (!b->last_shadow) {
867 next = b->shadow;
868
869 b->temporary = 0;
870 b->recycled = 0;
871
872 b->shadow = NULL;
873 b = next;
874 }
875
876 b->temporary = 0;
877 b->recycled = 0;
878 b->last_shadow = 0;
879
880 b->shadow = NULL;
881
882 buf->shadow = NULL;
883 }
884
885
886 static ngx_inline void
887 ngx_event_pipe_free_shadow_raw_buf(ngx_chain_t **free, ngx_buf_t *buf)
888 {
889 ngx_buf_t *s;
890 ngx_chain_t *cl, **ll;
891
892 if (buf->shadow == NULL) {
893 return;
894 }
895
896 for (s = buf->shadow; !s->last_shadow; s = s->shadow) { /* void */ }
897
898 ll = free;
899
900 for (cl = *free; cl; cl = cl->next) {
901 if (cl->buf == s) {
902 *ll = cl->next;
903 break;
904 }
905
906 if (cl->buf->shadow) {
907 break;
908 }
909
910 ll = &cl->next;
911 }
912 }
913
914
915 ngx_int_t
916 ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
917 {
918 ngx_chain_t *cl;
919
920 cl = ngx_alloc_chain_link(p->pool);
921 if (cl == NULL) {
922 return NGX_ERROR;
923 }
924
925 b->pos = b->start;
926 b->last = b->start;
927 b->shadow = NULL;
928
929 cl->buf = b;
930
931 if (p->free_raw_bufs == NULL) {
932 p->free_raw_bufs = cl;
933 cl->next = NULL;
934
935 return NGX_OK;
936 }
937
938 if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
939
940 /* add the free buf to the list start */
941
942 cl->next = p->free_raw_bufs;
943 p->free_raw_bufs = cl;
944
945 return NGX_OK;
946 }
947
948 /* the first free buf is partialy filled, thus add the free buf after it */
949
950 cl->next = p->free_raw_bufs->next;
951 p->free_raw_bufs->next = cl;
952
953 return NGX_OK;
954 }
955
956
957 static ngx_int_t
958 ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
959 {
960 ngx_chain_t *cl, *tl;
961
962 for ( ;; ) {
963 if (p->busy) {
964 cl = p->busy;
965 p->busy = NULL;
966
967 } else if (p->out) {
968 cl = p->out;
969 p->out = NULL;
970
971 } else if (p->in) {
972 cl = p->in;
973 p->in = NULL;
974
975 } else {
976 return NGX_OK;
977 }
978
979 while (cl) {
980 if (cl->buf->last_shadow) {
981 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
982 return NGX_ABORT;
983 }
984
985 cl->buf->last_shadow = 0;
986 }
987
988 cl->buf->shadow = NULL;
989 tl = cl->next;
990 cl->next = p->free;
991 p->free = cl;
992 cl = tl;
993 }
994 }
995 }
996
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.