chunkreader.go (3119B)
1 // Package chunkreader provides an io.Reader wrapper that minimizes IO reads and memory allocations. 2 package chunkreader 3 4 import ( 5 "io" 6 ) 7 8 // ChunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and 9 // will read as much as will fit in the current buffer in a single call regardless of how large a read is actually 10 // requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy. 11 // 12 // The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is 13 // referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare 14 // cases it would be advantageous to copy the bytes to another slice. 15 type ChunkReader struct { 16 r io.Reader 17 18 buf []byte 19 rp, wp int // buf read position and write position 20 21 config Config 22 } 23 24 // Config contains configuration parameters for ChunkReader. 25 type Config struct { 26 MinBufLen int // Minimum buffer length 27 } 28 29 // New creates and returns a new ChunkReader for r with default configuration. 30 func New(r io.Reader) *ChunkReader { 31 cr, err := NewConfig(r, Config{}) 32 if err != nil { 33 panic("default config can't be bad") 34 } 35 36 return cr 37 } 38 39 // NewConfig creates and a new ChunkReader for r configured by config. 40 func NewConfig(r io.Reader, config Config) (*ChunkReader, error) { 41 if config.MinBufLen == 0 { 42 // By historical reasons Postgres currently has 8KB send buffer inside, 43 // so here we want to have at least the same size buffer. 44 // @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134 45 // @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru 46 config.MinBufLen = 8192 47 } 48 49 return &ChunkReader{ 50 r: r, 51 buf: make([]byte, config.MinBufLen), 52 config: config, 53 }, nil 54 } 55 56 // Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy 57 // of buf. If an error occurs, buf will be nil. 58 func (r *ChunkReader) Next(n int) (buf []byte, err error) { 59 // n bytes already in buf 60 if (r.wp - r.rp) >= n { 61 buf = r.buf[r.rp : r.rp+n] 62 r.rp += n 63 return buf, err 64 } 65 66 // available space in buf is less than n 67 if len(r.buf) < n { 68 r.copyBufContents(r.newBuf(n)) 69 } 70 71 // buf is large enough, but need to shift filled area to start to make enough contiguous space 72 minReadCount := n - (r.wp - r.rp) 73 if (len(r.buf) - r.wp) < minReadCount { 74 newBuf := r.newBuf(n) 75 r.copyBufContents(newBuf) 76 } 77 78 if err := r.appendAtLeast(minReadCount); err != nil { 79 return nil, err 80 } 81 82 buf = r.buf[r.rp : r.rp+n] 83 r.rp += n 84 return buf, nil 85 } 86 87 func (r *ChunkReader) appendAtLeast(fillLen int) error { 88 n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen) 89 r.wp += n 90 return err 91 } 92 93 func (r *ChunkReader) newBuf(size int) []byte { 94 if size < r.config.MinBufLen { 95 size = r.config.MinBufLen 96 } 97 return make([]byte, size) 98 } 99 100 func (r *ChunkReader) copyBufContents(dest []byte) { 101 r.wp = copy(dest, r.buf[r.rp:r.wp]) 102 r.rp = 0 103 r.buf = dest 104 }