stream.go (5651B)
1 /* 2 * Copyright 2021 ByteDance Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package decoder 18 19 import ( 20 `bytes` 21 `io` 22 `sync` 23 24 `github.com/bytedance/sonic/option` 25 `github.com/bytedance/sonic/internal/native/types` 26 ) 27 28 var ( 29 minLeftBufferShift uint = 1 30 ) 31 32 // StreamDecoder is the decoder context object for streaming input. 33 type StreamDecoder struct { 34 r io.Reader 35 buf []byte 36 scanp int 37 scanned int64 38 err error 39 Decoder 40 } 41 42 var bufPool = sync.Pool{ 43 New: func () interface{} { 44 return make([]byte, 0, option.DefaultDecoderBufferSize) 45 }, 46 } 47 48 // NewStreamDecoder adapts to encoding/json.NewDecoder API. 49 // 50 // NewStreamDecoder returns a new decoder that reads from r. 51 func NewStreamDecoder(r io.Reader) *StreamDecoder { 52 return &StreamDecoder{r : r} 53 } 54 55 // Decode decodes input stream into val with corresponding data. 56 // Redundantly bytes may be read and left in its buffer, and can be used at next call. 57 // Either io error from underlying io.Reader (except io.EOF) 58 // or syntax error from data will be recorded and stop subsequently decoding. 59 func (self *StreamDecoder) Decode(val interface{}) (err error) { 60 if self.err != nil { 61 return self.err 62 } 63 64 var buf = self.buf[self.scanp:] 65 var p = 0 66 var recycle bool 67 if cap(buf) == 0 { 68 buf = bufPool.Get().([]byte) 69 recycle = true 70 } 71 72 var first = true 73 var repeat = true 74 read_more: 75 for { 76 l := len(buf) 77 realloc(&buf) 78 n, err := self.r.Read(buf[l:cap(buf)]) 79 buf = buf[:l+n] 80 if err != nil { 81 repeat = false 82 if err == io.EOF { 83 if len(buf) == 0 { 84 return err 85 } 86 break 87 } 88 self.err = err 89 return err 90 } 91 if n > 0 || first { 92 break 93 } 94 } 95 first = false 96 97 l := len(buf) 98 if l > 0 { 99 self.Decoder.Reset(string(buf)) 100 err = self.Decoder.Decode(val) 101 if err != nil { 102 if repeat && self.repeatable(err) { 103 goto read_more 104 } 105 self.err = err 106 } 107 108 p = self.Decoder.Pos() 109 self.scanned += int64(p) 110 self.scanp = 0 111 } 112 113 if l > p { 114 // remain undecoded bytes, so copy them into self.buf 115 self.buf = append(self.buf[:0], buf[p:]...) 116 } else { 117 self.buf = nil 118 recycle = true 119 } 120 121 if recycle { 122 buf = buf[:0] 123 bufPool.Put(buf) 124 } 125 return err 126 } 127 128 func (self StreamDecoder) repeatable(err error) bool { 129 if ee, ok := err.(SyntaxError); ok && 130 (ee.Code == types.ERR_EOF || (ee.Code == types.ERR_INVALID_CHAR && self.i >= len(self.s)-1)) { 131 return true 132 } 133 return false 134 } 135 136 // InputOffset returns the input stream byte offset of the current decoder position. 137 // The offset gives the location of the end of the most recently returned token and the beginning of the next token. 138 func (self *StreamDecoder) InputOffset() int64 { 139 return self.scanned + int64(self.scanp) 140 } 141 142 // Buffered returns a reader of the data remaining in the Decoder's buffer. 143 // The reader is valid until the next call to Decode. 144 func (self *StreamDecoder) Buffered() io.Reader { 145 return bytes.NewReader(self.buf[self.scanp:]) 146 } 147 148 // More reports whether there is another element in the 149 // current array or object being parsed. 150 func (self *StreamDecoder) More() bool { 151 if self.err != nil { 152 return false 153 } 154 c, err := self.peek() 155 return err == nil && c != ']' && c != '}' 156 } 157 158 func (self *StreamDecoder) peek() (byte, error) { 159 var err error 160 for { 161 for i := self.scanp; i < len(self.buf); i++ { 162 c := self.buf[i] 163 if isSpace(c) { 164 continue 165 } 166 self.scanp = i 167 return c, nil 168 } 169 // buffer has been scanned, now report any error 170 if err != nil { 171 if err != io.EOF { 172 self.err = err 173 } 174 return 0, err 175 } 176 err = self.refill() 177 } 178 } 179 180 func isSpace(c byte) bool { 181 return types.SPACE_MASK & (1 << c) != 0 182 } 183 184 func (self *StreamDecoder) refill() error { 185 // Make room to read more into the buffer. 186 // First slide down data already consumed. 187 if self.scanp > 0 { 188 self.scanned += int64(self.scanp) 189 n := copy(self.buf, self.buf[self.scanp:]) 190 self.buf = self.buf[:n] 191 self.scanp = 0 192 } 193 194 // Grow buffer if not large enough. 195 realloc(&self.buf) 196 197 // Read. Delay error for next iteration (after scan). 198 n, err := self.r.Read(self.buf[len(self.buf):cap(self.buf)]) 199 self.buf = self.buf[0 : len(self.buf)+n] 200 201 return err 202 } 203 204 func realloc(buf *[]byte) { 205 l := uint(len(*buf)) 206 c := uint(cap(*buf)) 207 if c - l <= c >> minLeftBufferShift { 208 e := l+(l>>minLeftBufferShift) 209 if e < option.DefaultDecoderBufferSize { 210 e = option.DefaultDecoderBufferSize 211 } 212 tmp := make([]byte, l, e) 213 copy(tmp, *buf) 214 *buf = tmp 215 } 216 } 217