gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

stream.go (5651B)


      1 /*
      2  * Copyright 2021 ByteDance Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package decoder
     18 
     19 import (
     20     `bytes`
     21     `io`
     22     `sync`
     23 
     24     `github.com/bytedance/sonic/option`
     25     `github.com/bytedance/sonic/internal/native/types`
     26 )
     27 
     28 var (
     29     minLeftBufferShift   uint = 1
     30 )
     31 
     32 // StreamDecoder is the decoder context object for streaming input.
     33 type StreamDecoder struct {
     34     r       io.Reader
     35     buf     []byte
     36     scanp   int
     37     scanned int64
     38     err     error
     39     Decoder
     40 }
     41 
     42 var bufPool = sync.Pool{
     43     New: func () interface{} {
     44         return make([]byte, 0, option.DefaultDecoderBufferSize)
     45     },
     46 }
     47 
     48 // NewStreamDecoder adapts to encoding/json.NewDecoder API.
     49 //
     50 // NewStreamDecoder returns a new decoder that reads from r.
     51 func NewStreamDecoder(r io.Reader) *StreamDecoder {
     52     return &StreamDecoder{r : r}
     53 }
     54 
     55 // Decode decodes input stream into val with corresponding data. 
     56 // Redundantly bytes may be read and left in its buffer, and can be used at next call.
     57 // Either io error from underlying io.Reader (except io.EOF) 
     58 // or syntax error from data will be recorded and stop subsequently decoding.
     59 func (self *StreamDecoder) Decode(val interface{}) (err error) {
     60     if self.err != nil {
     61         return self.err
     62     }
     63 
     64     var buf = self.buf[self.scanp:]
     65     var p = 0
     66     var recycle bool
     67     if cap(buf) == 0 {
     68         buf = bufPool.Get().([]byte)
     69         recycle = true
     70     }
     71     
     72     var first = true
     73     var repeat = true
     74 read_more:
     75     for {
     76         l := len(buf)
     77         realloc(&buf)
     78         n, err := self.r.Read(buf[l:cap(buf)])
     79         buf = buf[:l+n]
     80         if err != nil {
     81             repeat = false
     82             if err == io.EOF {
     83                 if len(buf) == 0 {
     84                     return err
     85                 }
     86                 break
     87             }
     88             self.err = err
     89             return err
     90         }
     91         if n > 0 || first {
     92             break
     93         }
     94     }
     95     first = false
     96 
     97     l := len(buf)
     98     if l > 0 {
     99         self.Decoder.Reset(string(buf))
    100         err = self.Decoder.Decode(val)
    101         if err != nil {
    102             if repeat && self.repeatable(err) {
    103                 goto read_more
    104             }
    105             self.err = err
    106         }
    107 
    108         p = self.Decoder.Pos()
    109         self.scanned += int64(p)
    110         self.scanp = 0
    111     }
    112     
    113     if l > p {
    114         // remain undecoded bytes, so copy them into self.buf
    115         self.buf = append(self.buf[:0], buf[p:]...)
    116     } else {
    117         self.buf = nil
    118         recycle = true
    119     }
    120 
    121     if recycle {
    122         buf = buf[:0]
    123         bufPool.Put(buf)
    124     }
    125     return err
    126 }
    127 
    128 func (self StreamDecoder) repeatable(err error) bool {
    129     if ee, ok := err.(SyntaxError); ok && 
    130     (ee.Code == types.ERR_EOF || (ee.Code == types.ERR_INVALID_CHAR && self.i >= len(self.s)-1)) {
    131         return true
    132     }
    133     return false
    134 }
    135 
    136 // InputOffset returns the input stream byte offset of the current decoder position. 
    137 // The offset gives the location of the end of the most recently returned token and the beginning of the next token.
    138 func (self *StreamDecoder) InputOffset() int64 {
    139     return self.scanned + int64(self.scanp)
    140 }
    141 
    142 // Buffered returns a reader of the data remaining in the Decoder's buffer. 
    143 // The reader is valid until the next call to Decode.
    144 func (self *StreamDecoder) Buffered() io.Reader {
    145     return bytes.NewReader(self.buf[self.scanp:])
    146 }
    147 
    148 // More reports whether there is another element in the
    149 // current array or object being parsed.
    150 func (self *StreamDecoder) More() bool {
    151     if self.err != nil {
    152         return false
    153     }
    154     c, err := self.peek()
    155     return err == nil && c != ']' && c != '}'
    156 }
    157 
    158 func (self *StreamDecoder) peek() (byte, error) {
    159     var err error
    160     for {
    161         for i := self.scanp; i < len(self.buf); i++ {
    162             c := self.buf[i]
    163             if isSpace(c) {
    164                 continue
    165             }
    166             self.scanp = i
    167             return c, nil
    168         }
    169         // buffer has been scanned, now report any error
    170         if err != nil {
    171             if err != io.EOF {
    172                 self.err = err
    173             }
    174             return 0, err
    175         }
    176         err = self.refill()
    177     }
    178 }
    179 
    180 func isSpace(c byte) bool {
    181     return types.SPACE_MASK & (1 << c) != 0
    182 }
    183 
    184 func (self *StreamDecoder) refill() error {
    185     // Make room to read more into the buffer.
    186     // First slide down data already consumed.
    187     if self.scanp > 0 {
    188         self.scanned += int64(self.scanp)
    189         n := copy(self.buf, self.buf[self.scanp:])
    190         self.buf = self.buf[:n]
    191         self.scanp = 0
    192     }
    193 
    194     // Grow buffer if not large enough.
    195     realloc(&self.buf)
    196 
    197     // Read. Delay error for next iteration (after scan).
    198     n, err := self.r.Read(self.buf[len(self.buf):cap(self.buf)])
    199     self.buf = self.buf[0 : len(self.buf)+n]
    200 
    201     return err
    202 }
    203 
    204 func realloc(buf *[]byte) {
    205     l := uint(len(*buf))
    206     c := uint(cap(*buf))
    207     if c - l <= c >> minLeftBufferShift {
    208         e := l+(l>>minLeftBufferShift)
    209         if e < option.DefaultDecoderBufferSize {
    210             e = option.DefaultDecoderBufferSize
    211         }
    212         tmp := make([]byte, l, e)
    213         copy(tmp, *buf)
    214         *buf = tmp
    215     }
    216 }
    217