chunkreader.go (2674B)
1 package pgproto3 2 3 import ( 4 "io" 5 6 "github.com/jackc/pgx/v5/internal/iobufpool" 7 ) 8 9 // chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and 10 // will read as much as will fit in the current buffer in a single call regardless of how large a read is actually 11 // requested. The memory returned via Next is only valid until the next call to Next. 12 // 13 // This is roughly equivalent to a bufio.Reader that only uses Peek and Discard to never copy bytes. 14 type chunkReader struct { 15 r io.Reader 16 17 buf *[]byte 18 rp, wp int // buf read position and write position 19 20 minBufSize int 21 } 22 23 // newChunkReader creates and returns a new chunkReader for r with default configuration. If minBufSize is <= 0 it uses 24 // a default value. 25 func newChunkReader(r io.Reader, minBufSize int) *chunkReader { 26 if minBufSize <= 0 { 27 // By historical reasons Postgres currently has 8KB send buffer inside, 28 // so here we want to have at least the same size buffer. 29 // @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134 30 // @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru 31 // 32 // In addition, testing has found no benefit of any larger buffer. 33 minBufSize = 8192 34 } 35 36 return &chunkReader{ 37 r: r, 38 minBufSize: minBufSize, 39 buf: iobufpool.Get(minBufSize), 40 } 41 } 42 43 // Next returns buf filled with the next n bytes. buf is only valid until next call of Next. If an error occurs, buf 44 // will be nil. 45 func (r *chunkReader) Next(n int) (buf []byte, err error) { 46 // Reset the buffer if it is empty 47 if r.rp == r.wp { 48 if len(*r.buf) != r.minBufSize { 49 iobufpool.Put(r.buf) 50 r.buf = iobufpool.Get(r.minBufSize) 51 } 52 r.rp = 0 53 r.wp = 0 54 } 55 56 // n bytes already in buf 57 if (r.wp - r.rp) >= n { 58 buf = (*r.buf)[r.rp : r.rp+n : r.rp+n] 59 r.rp += n 60 return buf, err 61 } 62 63 // buf is smaller than requested number of bytes 64 if len(*r.buf) < n { 65 bigBuf := iobufpool.Get(n) 66 r.wp = copy((*bigBuf), (*r.buf)[r.rp:r.wp]) 67 r.rp = 0 68 iobufpool.Put(r.buf) 69 r.buf = bigBuf 70 } 71 72 // buf is large enough, but need to shift filled area to start to make enough contiguous space 73 minReadCount := n - (r.wp - r.rp) 74 if (len(*r.buf) - r.wp) < minReadCount { 75 r.wp = copy((*r.buf), (*r.buf)[r.rp:r.wp]) 76 r.rp = 0 77 } 78 79 // Read at least the required number of bytes from the underlying io.Reader 80 readBytesCount, err := io.ReadAtLeast(r.r, (*r.buf)[r.wp:], minReadCount) 81 r.wp += readBytesCount 82 // fmt.Println("read", n) 83 if err != nil { 84 return nil, err 85 } 86 87 buf = (*r.buf)[r.rp : r.rp+n : r.rp+n] 88 r.rp += n 89 return buf, nil 90 }