gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

streamlexer.go (5852B)


      1 package buffer
      2 
      3 import (
      4 	"io"
      5 )
      6 
      7 type block struct {
      8 	buf    []byte
      9 	next   int // index in pool plus one
     10 	active bool
     11 }
     12 
     13 type bufferPool struct {
     14 	pool []block
     15 	head int // index in pool plus one
     16 	tail int // index in pool plus one
     17 
     18 	pos int // byte pos in tail
     19 }
     20 
     21 func (z *bufferPool) swap(oldBuf []byte, size int) []byte {
     22 	// find new buffer that can be reused
     23 	swap := -1
     24 	for i := 0; i < len(z.pool); i++ {
     25 		if !z.pool[i].active && size <= cap(z.pool[i].buf) {
     26 			swap = i
     27 			break
     28 		}
     29 	}
     30 	if swap == -1 { // no free buffer found for reuse
     31 		if z.tail == 0 && z.pos >= len(oldBuf) && size <= cap(oldBuf) { // but we can reuse the current buffer!
     32 			z.pos -= len(oldBuf)
     33 			return oldBuf[:0]
     34 		}
     35 		// allocate new
     36 		z.pool = append(z.pool, block{make([]byte, 0, size), 0, true})
     37 		swap = len(z.pool) - 1
     38 	}
     39 
     40 	newBuf := z.pool[swap].buf
     41 
     42 	// put current buffer into pool
     43 	z.pool[swap] = block{oldBuf, 0, true}
     44 	if z.head != 0 {
     45 		z.pool[z.head-1].next = swap + 1
     46 	}
     47 	z.head = swap + 1
     48 	if z.tail == 0 {
     49 		z.tail = swap + 1
     50 	}
     51 
     52 	return newBuf[:0]
     53 }
     54 
     55 func (z *bufferPool) free(n int) {
     56 	z.pos += n
     57 	// move the tail over to next buffers
     58 	for z.tail != 0 && z.pos >= len(z.pool[z.tail-1].buf) {
     59 		z.pos -= len(z.pool[z.tail-1].buf)
     60 		newTail := z.pool[z.tail-1].next
     61 		z.pool[z.tail-1].active = false // after this, any thread may pick up the inactive buffer, so it can't be used anymore
     62 		z.tail = newTail
     63 	}
     64 	if z.tail == 0 {
     65 		z.head = 0
     66 	}
     67 }
     68 
     69 // StreamLexer is a buffered reader that allows peeking forward and shifting, taking an io.Reader.
     70 // It keeps data in-memory until Free, taking a byte length, is called to move beyond the data.
     71 type StreamLexer struct {
     72 	r   io.Reader
     73 	err error
     74 
     75 	pool bufferPool
     76 
     77 	buf       []byte
     78 	start     int // index in buf
     79 	pos       int // index in buf
     80 	prevStart int
     81 
     82 	free int
     83 }
     84 
     85 // NewStreamLexer returns a new StreamLexer for a given io.Reader with a 4kB estimated buffer size.
     86 // If the io.Reader implements Bytes, that buffer is used instead.
     87 func NewStreamLexer(r io.Reader) *StreamLexer {
     88 	return NewStreamLexerSize(r, defaultBufSize)
     89 }
     90 
     91 // NewStreamLexerSize returns a new StreamLexer for a given io.Reader and estimated required buffer size.
     92 // If the io.Reader implements Bytes, that buffer is used instead.
     93 func NewStreamLexerSize(r io.Reader, size int) *StreamLexer {
     94 	// if reader has the bytes in memory already, use that instead
     95 	if buffer, ok := r.(interface {
     96 		Bytes() []byte
     97 	}); ok {
     98 		return &StreamLexer{
     99 			err: io.EOF,
    100 			buf: buffer.Bytes(),
    101 		}
    102 	}
    103 	return &StreamLexer{
    104 		r:   r,
    105 		buf: make([]byte, 0, size),
    106 	}
    107 }
    108 
    109 func (z *StreamLexer) read(pos int) byte {
    110 	if z.err != nil {
    111 		return 0
    112 	}
    113 
    114 	// free unused bytes
    115 	z.pool.free(z.free)
    116 	z.free = 0
    117 
    118 	// get new buffer
    119 	c := cap(z.buf)
    120 	p := pos - z.start + 1
    121 	if 2*p > c { // if the token is larger than half the buffer, increase buffer size
    122 		c = 2*c + p
    123 	}
    124 	d := len(z.buf) - z.start
    125 	buf := z.pool.swap(z.buf[:z.start], c)
    126 	copy(buf[:d], z.buf[z.start:]) // copy the left-overs (unfinished token) from the old buffer
    127 
    128 	// read in new data for the rest of the buffer
    129 	var n int
    130 	for pos-z.start >= d && z.err == nil {
    131 		n, z.err = z.r.Read(buf[d:cap(buf)])
    132 		d += n
    133 	}
    134 	pos -= z.start
    135 	z.pos -= z.start
    136 	z.start, z.buf = 0, buf[:d]
    137 	if pos >= d {
    138 		return 0
    139 	}
    140 	return z.buf[pos]
    141 }
    142 
    143 // Err returns the error returned from io.Reader. It may still return valid bytes for a while though.
    144 func (z *StreamLexer) Err() error {
    145 	if z.err == io.EOF && z.pos < len(z.buf) {
    146 		return nil
    147 	}
    148 	return z.err
    149 }
    150 
    151 // Free frees up bytes of length n from previously shifted tokens.
    152 // Each call to Shift should at one point be followed by a call to Free with a length returned by ShiftLen.
    153 func (z *StreamLexer) Free(n int) {
    154 	z.free += n
    155 }
    156 
    157 // Peek returns the ith byte relative to the end position and possibly does an allocation.
    158 // Peek returns zero when an error has occurred, Err returns the error.
    159 // TODO: inline function
    160 func (z *StreamLexer) Peek(pos int) byte {
    161 	pos += z.pos
    162 	if uint(pos) < uint(len(z.buf)) { // uint for BCE
    163 		return z.buf[pos]
    164 	}
    165 	return z.read(pos)
    166 }
    167 
    168 // PeekRune returns the rune and rune length of the ith byte relative to the end position.
    169 func (z *StreamLexer) PeekRune(pos int) (rune, int) {
    170 	// from unicode/utf8
    171 	c := z.Peek(pos)
    172 	if c < 0xC0 {
    173 		return rune(c), 1
    174 	} else if c < 0xE0 {
    175 		return rune(c&0x1F)<<6 | rune(z.Peek(pos+1)&0x3F), 2
    176 	} else if c < 0xF0 {
    177 		return rune(c&0x0F)<<12 | rune(z.Peek(pos+1)&0x3F)<<6 | rune(z.Peek(pos+2)&0x3F), 3
    178 	}
    179 	return rune(c&0x07)<<18 | rune(z.Peek(pos+1)&0x3F)<<12 | rune(z.Peek(pos+2)&0x3F)<<6 | rune(z.Peek(pos+3)&0x3F), 4
    180 }
    181 
    182 // Move advances the position.
    183 func (z *StreamLexer) Move(n int) {
    184 	z.pos += n
    185 }
    186 
    187 // Pos returns a mark to which can be rewinded.
    188 func (z *StreamLexer) Pos() int {
    189 	return z.pos - z.start
    190 }
    191 
    192 // Rewind rewinds the position to the given position.
    193 func (z *StreamLexer) Rewind(pos int) {
    194 	z.pos = z.start + pos
    195 }
    196 
    197 // Lexeme returns the bytes of the current selection.
    198 func (z *StreamLexer) Lexeme() []byte {
    199 	return z.buf[z.start:z.pos]
    200 }
    201 
    202 // Skip collapses the position to the end of the selection.
    203 func (z *StreamLexer) Skip() {
    204 	z.start = z.pos
    205 }
    206 
    207 // Shift returns the bytes of the current selection and collapses the position to the end of the selection.
    208 // It also returns the number of bytes we moved since the last call to Shift. This can be used in calls to Free.
    209 func (z *StreamLexer) Shift() []byte {
    210 	if z.pos > len(z.buf) { // make sure we peeked at least as much as we shift
    211 		z.read(z.pos - 1)
    212 	}
    213 	b := z.buf[z.start:z.pos]
    214 	z.start = z.pos
    215 	return b
    216 }
    217 
    218 // ShiftLen returns the number of bytes moved since the last call to ShiftLen. This can be used in calls to Free because it takes into account multiple Shifts or Skips.
    219 func (z *StreamLexer) ShiftLen() int {
    220 	n := z.start - z.prevStart
    221 	z.prevStart = z.start
    222 	return n
    223 }