gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

pipe.go (4174B)


      1 // Copyright 2014 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package http2
      6 
      7 import (
      8 	"errors"
      9 	"io"
     10 	"sync"
     11 )
     12 
     13 // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
     14 // io.Pipe except there are no PipeReader/PipeWriter halves, and the
     15 // underlying buffer is an interface. (io.Pipe is always unbuffered)
     16 type pipe struct {
     17 	mu       sync.Mutex
     18 	c        sync.Cond     // c.L lazily initialized to &p.mu
     19 	b        pipeBuffer    // nil when done reading
     20 	unread   int           // bytes unread when done
     21 	err      error         // read error once empty. non-nil means closed.
     22 	breakErr error         // immediate read error (caller doesn't see rest of b)
     23 	donec    chan struct{} // closed on error
     24 	readFn   func()        // optional code to run in Read before error
     25 }
     26 
     27 type pipeBuffer interface {
     28 	Len() int
     29 	io.Writer
     30 	io.Reader
     31 }
     32 
     33 // setBuffer initializes the pipe buffer.
     34 // It has no effect if the pipe is already closed.
     35 func (p *pipe) setBuffer(b pipeBuffer) {
     36 	p.mu.Lock()
     37 	defer p.mu.Unlock()
     38 	if p.err != nil || p.breakErr != nil {
     39 		return
     40 	}
     41 	p.b = b
     42 }
     43 
     44 func (p *pipe) Len() int {
     45 	p.mu.Lock()
     46 	defer p.mu.Unlock()
     47 	if p.b == nil {
     48 		return p.unread
     49 	}
     50 	return p.b.Len()
     51 }
     52 
     53 // Read waits until data is available and copies bytes
     54 // from the buffer into p.
     55 func (p *pipe) Read(d []byte) (n int, err error) {
     56 	p.mu.Lock()
     57 	defer p.mu.Unlock()
     58 	if p.c.L == nil {
     59 		p.c.L = &p.mu
     60 	}
     61 	for {
     62 		if p.breakErr != nil {
     63 			return 0, p.breakErr
     64 		}
     65 		if p.b != nil && p.b.Len() > 0 {
     66 			return p.b.Read(d)
     67 		}
     68 		if p.err != nil {
     69 			if p.readFn != nil {
     70 				p.readFn()     // e.g. copy trailers
     71 				p.readFn = nil // not sticky like p.err
     72 			}
     73 			p.b = nil
     74 			return 0, p.err
     75 		}
     76 		p.c.Wait()
     77 	}
     78 }
     79 
     80 var errClosedPipeWrite = errors.New("write on closed buffer")
     81 
     82 // Write copies bytes from p into the buffer and wakes a reader.
     83 // It is an error to write more data than the buffer can hold.
     84 func (p *pipe) Write(d []byte) (n int, err error) {
     85 	p.mu.Lock()
     86 	defer p.mu.Unlock()
     87 	if p.c.L == nil {
     88 		p.c.L = &p.mu
     89 	}
     90 	defer p.c.Signal()
     91 	if p.err != nil || p.breakErr != nil {
     92 		return 0, errClosedPipeWrite
     93 	}
     94 	return p.b.Write(d)
     95 }
     96 
     97 // CloseWithError causes the next Read (waking up a current blocked
     98 // Read if needed) to return the provided err after all data has been
     99 // read.
    100 //
    101 // The error must be non-nil.
    102 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
    103 
    104 // BreakWithError causes the next Read (waking up a current blocked
    105 // Read if needed) to return the provided err immediately, without
    106 // waiting for unread data.
    107 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
    108 
    109 // closeWithErrorAndCode is like CloseWithError but also sets some code to run
    110 // in the caller's goroutine before returning the error.
    111 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
    112 
    113 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
    114 	if err == nil {
    115 		panic("err must be non-nil")
    116 	}
    117 	p.mu.Lock()
    118 	defer p.mu.Unlock()
    119 	if p.c.L == nil {
    120 		p.c.L = &p.mu
    121 	}
    122 	defer p.c.Signal()
    123 	if *dst != nil {
    124 		// Already been done.
    125 		return
    126 	}
    127 	p.readFn = fn
    128 	if dst == &p.breakErr {
    129 		if p.b != nil {
    130 			p.unread += p.b.Len()
    131 		}
    132 		p.b = nil
    133 	}
    134 	*dst = err
    135 	p.closeDoneLocked()
    136 }
    137 
    138 // requires p.mu be held.
    139 func (p *pipe) closeDoneLocked() {
    140 	if p.donec == nil {
    141 		return
    142 	}
    143 	// Close if unclosed. This isn't racy since we always
    144 	// hold p.mu while closing.
    145 	select {
    146 	case <-p.donec:
    147 	default:
    148 		close(p.donec)
    149 	}
    150 }
    151 
    152 // Err returns the error (if any) first set by BreakWithError or CloseWithError.
    153 func (p *pipe) Err() error {
    154 	p.mu.Lock()
    155 	defer p.mu.Unlock()
    156 	if p.breakErr != nil {
    157 		return p.breakErr
    158 	}
    159 	return p.err
    160 }
    161 
    162 // Done returns a channel which is closed if and when this pipe is closed
    163 // with CloseWithError.
    164 func (p *pipe) Done() <-chan struct{} {
    165 	p.mu.Lock()
    166 	defer p.mu.Unlock()
    167 	if p.donec == nil {
    168 		p.donec = make(chan struct{})
    169 		if p.err != nil || p.breakErr != nil {
    170 			// Already hit an error.
    171 			p.closeDoneLocked()
    172 		}
    173 	}
    174 	return p.donec
    175 }