gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

reader.go (27028B)


      1 // Copyright 2011 The Snappy-Go Authors. All rights reserved.
      2 // Copyright (c) 2019+ Klaus Post. All rights reserved.
      3 // Use of this source code is governed by a BSD-style
      4 // license that can be found in the LICENSE file.
      5 
      6 package s2
      7 
      8 import (
      9 	"errors"
     10 	"fmt"
     11 	"io"
     12 	"io/ioutil"
     13 	"math"
     14 	"runtime"
     15 	"sync"
     16 )
     17 
     18 // ErrCantSeek is returned if the stream cannot be seeked.
     19 type ErrCantSeek struct {
     20 	Reason string
     21 }
     22 
     23 // Error returns the error as string.
     24 func (e ErrCantSeek) Error() string {
     25 	return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
     26 }
     27 
     28 // NewReader returns a new Reader that decompresses from r, using the framing
     29 // format described at
     30 // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
     31 func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
     32 	nr := Reader{
     33 		r:        r,
     34 		maxBlock: maxBlockSize,
     35 	}
     36 	for _, opt := range opts {
     37 		if err := opt(&nr); err != nil {
     38 			nr.err = err
     39 			return &nr
     40 		}
     41 	}
     42 	nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
     43 	if nr.lazyBuf > 0 {
     44 		nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
     45 	} else {
     46 		nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
     47 	}
     48 	nr.readHeader = nr.ignoreStreamID
     49 	nr.paramsOK = true
     50 	return &nr
     51 }
     52 
     53 // ReaderOption is an option for creating a decoder.
     54 type ReaderOption func(*Reader) error
     55 
     56 // ReaderMaxBlockSize allows to control allocations if the stream
     57 // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
     58 // Blocks must be this size or smaller to decompress,
     59 // otherwise the decoder will return ErrUnsupported.
     60 //
     61 // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
     62 //
     63 // Default is the maximum limit of 4MB.
     64 func ReaderMaxBlockSize(blockSize int) ReaderOption {
     65 	return func(r *Reader) error {
     66 		if blockSize > maxBlockSize || blockSize <= 0 {
     67 			return errors.New("s2: block size too large. Must be <= 4MB and > 0")
     68 		}
     69 		if r.lazyBuf == 0 && blockSize < defaultBlockSize {
     70 			r.lazyBuf = blockSize
     71 		}
     72 		r.maxBlock = blockSize
     73 		return nil
     74 	}
     75 }
     76 
     77 // ReaderAllocBlock allows to control upfront stream allocations
     78 // and not allocate for frames bigger than this initially.
     79 // If frames bigger than this is seen a bigger buffer will be allocated.
     80 //
     81 // Default is 1MB, which is default output size.
     82 func ReaderAllocBlock(blockSize int) ReaderOption {
     83 	return func(r *Reader) error {
     84 		if blockSize > maxBlockSize || blockSize < 1024 {
     85 			return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
     86 		}
     87 		r.lazyBuf = blockSize
     88 		return nil
     89 	}
     90 }
     91 
     92 // ReaderIgnoreStreamIdentifier will make the reader skip the expected
     93 // stream identifier at the beginning of the stream.
     94 // This can be used when serving a stream that has been forwarded to a specific point.
     95 func ReaderIgnoreStreamIdentifier() ReaderOption {
     96 	return func(r *Reader) error {
     97 		r.ignoreStreamID = true
     98 		return nil
     99 	}
    100 }
    101 
    102 // ReaderSkippableCB will register a callback for chuncks with the specified ID.
    103 // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
    104 // For each chunk with the ID, the callback is called with the content.
    105 // Any returned non-nil error will abort decompression.
    106 // Only one callback per ID is supported, latest sent will be used.
    107 func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
    108 	return func(r *Reader) error {
    109 		if id < 0x80 || id > 0xfd {
    110 			return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
    111 		}
    112 		r.skippableCB[id] = fn
    113 		return nil
    114 	}
    115 }
    116 
    117 // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
    118 func ReaderIgnoreCRC() ReaderOption {
    119 	return func(r *Reader) error {
    120 		r.ignoreCRC = true
    121 		return nil
    122 	}
    123 }
    124 
    125 // Reader is an io.Reader that can read Snappy-compressed bytes.
    126 type Reader struct {
    127 	r           io.Reader
    128 	err         error
    129 	decoded     []byte
    130 	buf         []byte
    131 	skippableCB [0x80]func(r io.Reader) error
    132 	blockStart  int64 // Uncompressed offset at start of current.
    133 	index       *Index
    134 
    135 	// decoded[i:j] contains decoded bytes that have not yet been passed on.
    136 	i, j int
    137 	// maximum block size allowed.
    138 	maxBlock int
    139 	// maximum expected buffer size.
    140 	maxBufSize int
    141 	// alloc a buffer this size if > 0.
    142 	lazyBuf        int
    143 	readHeader     bool
    144 	paramsOK       bool
    145 	snappyFrame    bool
    146 	ignoreStreamID bool
    147 	ignoreCRC      bool
    148 }
    149 
    150 // ensureBufferSize will ensure that the buffer can take at least n bytes.
    151 // If false is returned the buffer exceeds maximum allowed size.
    152 func (r *Reader) ensureBufferSize(n int) bool {
    153 	if n > r.maxBufSize {
    154 		r.err = ErrCorrupt
    155 		return false
    156 	}
    157 	if cap(r.buf) >= n {
    158 		return true
    159 	}
    160 	// Realloc buffer.
    161 	r.buf = make([]byte, n)
    162 	return true
    163 }
    164 
    165 // Reset discards any buffered data, resets all state, and switches the Snappy
    166 // reader to read from r. This permits reusing a Reader rather than allocating
    167 // a new one.
    168 func (r *Reader) Reset(reader io.Reader) {
    169 	if !r.paramsOK {
    170 		return
    171 	}
    172 	r.index = nil
    173 	r.r = reader
    174 	r.err = nil
    175 	r.i = 0
    176 	r.j = 0
    177 	r.blockStart = 0
    178 	r.readHeader = r.ignoreStreamID
    179 }
    180 
    181 func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
    182 	if _, r.err = io.ReadFull(r.r, p); r.err != nil {
    183 		if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
    184 			r.err = ErrCorrupt
    185 		}
    186 		return false
    187 	}
    188 	return true
    189 }
    190 
    191 // skippable will skip n bytes.
    192 // If the supplied reader supports seeking that is used.
    193 // tmp is used as a temporary buffer for reading.
    194 // The supplied slice does not need to be the size of the read.
    195 func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
    196 	if id < 0x80 {
    197 		r.err = fmt.Errorf("interbal error: skippable id < 0x80")
    198 		return false
    199 	}
    200 	if fn := r.skippableCB[id-0x80]; fn != nil {
    201 		rd := io.LimitReader(r.r, int64(n))
    202 		r.err = fn(rd)
    203 		if r.err != nil {
    204 			return false
    205 		}
    206 		_, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
    207 		return r.err == nil
    208 	}
    209 	if rs, ok := r.r.(io.ReadSeeker); ok {
    210 		_, err := rs.Seek(int64(n), io.SeekCurrent)
    211 		if err == nil {
    212 			return true
    213 		}
    214 		if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
    215 			r.err = ErrCorrupt
    216 			return false
    217 		}
    218 	}
    219 	for n > 0 {
    220 		if n < len(tmp) {
    221 			tmp = tmp[:n]
    222 		}
    223 		if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
    224 			if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
    225 				r.err = ErrCorrupt
    226 			}
    227 			return false
    228 		}
    229 		n -= len(tmp)
    230 	}
    231 	return true
    232 }
    233 
    234 // Read satisfies the io.Reader interface.
    235 func (r *Reader) Read(p []byte) (int, error) {
    236 	if r.err != nil {
    237 		return 0, r.err
    238 	}
    239 	for {
    240 		if r.i < r.j {
    241 			n := copy(p, r.decoded[r.i:r.j])
    242 			r.i += n
    243 			return n, nil
    244 		}
    245 		if !r.readFull(r.buf[:4], true) {
    246 			return 0, r.err
    247 		}
    248 		chunkType := r.buf[0]
    249 		if !r.readHeader {
    250 			if chunkType != chunkTypeStreamIdentifier {
    251 				r.err = ErrCorrupt
    252 				return 0, r.err
    253 			}
    254 			r.readHeader = true
    255 		}
    256 		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
    257 
    258 		// The chunk types are specified at
    259 		// https://github.com/google/snappy/blob/master/framing_format.txt
    260 		switch chunkType {
    261 		case chunkTypeCompressedData:
    262 			r.blockStart += int64(r.j)
    263 			// Section 4.2. Compressed data (chunk type 0x00).
    264 			if chunkLen < checksumSize {
    265 				r.err = ErrCorrupt
    266 				return 0, r.err
    267 			}
    268 			if !r.ensureBufferSize(chunkLen) {
    269 				if r.err == nil {
    270 					r.err = ErrUnsupported
    271 				}
    272 				return 0, r.err
    273 			}
    274 			buf := r.buf[:chunkLen]
    275 			if !r.readFull(buf, false) {
    276 				return 0, r.err
    277 			}
    278 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    279 			buf = buf[checksumSize:]
    280 
    281 			n, err := DecodedLen(buf)
    282 			if err != nil {
    283 				r.err = err
    284 				return 0, r.err
    285 			}
    286 			if r.snappyFrame && n > maxSnappyBlockSize {
    287 				r.err = ErrCorrupt
    288 				return 0, r.err
    289 			}
    290 
    291 			if n > len(r.decoded) {
    292 				if n > r.maxBlock {
    293 					r.err = ErrCorrupt
    294 					return 0, r.err
    295 				}
    296 				r.decoded = make([]byte, n)
    297 			}
    298 			if _, err := Decode(r.decoded, buf); err != nil {
    299 				r.err = err
    300 				return 0, r.err
    301 			}
    302 			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
    303 				r.err = ErrCRC
    304 				return 0, r.err
    305 			}
    306 			r.i, r.j = 0, n
    307 			continue
    308 
    309 		case chunkTypeUncompressedData:
    310 			r.blockStart += int64(r.j)
    311 			// Section 4.3. Uncompressed data (chunk type 0x01).
    312 			if chunkLen < checksumSize {
    313 				r.err = ErrCorrupt
    314 				return 0, r.err
    315 			}
    316 			if !r.ensureBufferSize(chunkLen) {
    317 				if r.err == nil {
    318 					r.err = ErrUnsupported
    319 				}
    320 				return 0, r.err
    321 			}
    322 			buf := r.buf[:checksumSize]
    323 			if !r.readFull(buf, false) {
    324 				return 0, r.err
    325 			}
    326 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    327 			// Read directly into r.decoded instead of via r.buf.
    328 			n := chunkLen - checksumSize
    329 			if r.snappyFrame && n > maxSnappyBlockSize {
    330 				r.err = ErrCorrupt
    331 				return 0, r.err
    332 			}
    333 			if n > len(r.decoded) {
    334 				if n > r.maxBlock {
    335 					r.err = ErrCorrupt
    336 					return 0, r.err
    337 				}
    338 				r.decoded = make([]byte, n)
    339 			}
    340 			if !r.readFull(r.decoded[:n], false) {
    341 				return 0, r.err
    342 			}
    343 			if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
    344 				r.err = ErrCRC
    345 				return 0, r.err
    346 			}
    347 			r.i, r.j = 0, n
    348 			continue
    349 
    350 		case chunkTypeStreamIdentifier:
    351 			// Section 4.1. Stream identifier (chunk type 0xff).
    352 			if chunkLen != len(magicBody) {
    353 				r.err = ErrCorrupt
    354 				return 0, r.err
    355 			}
    356 			if !r.readFull(r.buf[:len(magicBody)], false) {
    357 				return 0, r.err
    358 			}
    359 			if string(r.buf[:len(magicBody)]) != magicBody {
    360 				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
    361 					r.err = ErrCorrupt
    362 					return 0, r.err
    363 				} else {
    364 					r.snappyFrame = true
    365 				}
    366 			} else {
    367 				r.snappyFrame = false
    368 			}
    369 			continue
    370 		}
    371 
    372 		if chunkType <= 0x7f {
    373 			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
    374 			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
    375 			r.err = ErrUnsupported
    376 			return 0, r.err
    377 		}
    378 		// Section 4.4 Padding (chunk type 0xfe).
    379 		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
    380 		if chunkLen > maxChunkSize {
    381 			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
    382 			r.err = ErrUnsupported
    383 			return 0, r.err
    384 		}
    385 
    386 		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
    387 		if !r.skippable(r.buf, chunkLen, false, chunkType) {
    388 			return 0, r.err
    389 		}
    390 	}
    391 }
    392 
    393 // DecodeConcurrent will decode the full stream to w.
    394 // This function should not be combined with reading, seeking or other operations.
    395 // Up to 'concurrent' goroutines will be used.
    396 // If <= 0, runtime.NumCPU will be used.
    397 // On success the number of bytes decompressed nil and is returned.
    398 // This is mainly intended for bigger streams.
    399 func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
    400 	if r.i > 0 || r.j > 0 || r.blockStart > 0 {
    401 		return 0, errors.New("DecodeConcurrent called after ")
    402 	}
    403 	if concurrent <= 0 {
    404 		concurrent = runtime.NumCPU()
    405 	}
    406 
    407 	// Write to output
    408 	var errMu sync.Mutex
    409 	var aErr error
    410 	setErr := func(e error) (ok bool) {
    411 		errMu.Lock()
    412 		defer errMu.Unlock()
    413 		if e == nil {
    414 			return aErr == nil
    415 		}
    416 		if aErr == nil {
    417 			aErr = e
    418 		}
    419 		return false
    420 	}
    421 	hasErr := func() (ok bool) {
    422 		errMu.Lock()
    423 		v := aErr != nil
    424 		errMu.Unlock()
    425 		return v
    426 	}
    427 
    428 	var aWritten int64
    429 	toRead := make(chan []byte, concurrent)
    430 	writtenBlocks := make(chan []byte, concurrent)
    431 	queue := make(chan chan []byte, concurrent)
    432 	reUse := make(chan chan []byte, concurrent)
    433 	for i := 0; i < concurrent; i++ {
    434 		toRead <- make([]byte, 0, r.maxBufSize)
    435 		writtenBlocks <- make([]byte, 0, r.maxBufSize)
    436 		reUse <- make(chan []byte, 1)
    437 	}
    438 	// Writer
    439 	var wg sync.WaitGroup
    440 	wg.Add(1)
    441 	go func() {
    442 		defer wg.Done()
    443 		for toWrite := range queue {
    444 			entry := <-toWrite
    445 			reUse <- toWrite
    446 			if hasErr() {
    447 				writtenBlocks <- entry
    448 				continue
    449 			}
    450 			n, err := w.Write(entry)
    451 			want := len(entry)
    452 			writtenBlocks <- entry
    453 			if err != nil {
    454 				setErr(err)
    455 				continue
    456 			}
    457 			if n != want {
    458 				setErr(io.ErrShortWrite)
    459 				continue
    460 			}
    461 			aWritten += int64(n)
    462 		}
    463 	}()
    464 
    465 	// Reader
    466 	defer func() {
    467 		close(queue)
    468 		if r.err != nil {
    469 			err = r.err
    470 			setErr(r.err)
    471 		}
    472 		wg.Wait()
    473 		if err == nil {
    474 			err = aErr
    475 		}
    476 		written = aWritten
    477 	}()
    478 
    479 	for !hasErr() {
    480 		if !r.readFull(r.buf[:4], true) {
    481 			if r.err == io.EOF {
    482 				r.err = nil
    483 			}
    484 			return 0, r.err
    485 		}
    486 		chunkType := r.buf[0]
    487 		if !r.readHeader {
    488 			if chunkType != chunkTypeStreamIdentifier {
    489 				r.err = ErrCorrupt
    490 				return 0, r.err
    491 			}
    492 			r.readHeader = true
    493 		}
    494 		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
    495 
    496 		// The chunk types are specified at
    497 		// https://github.com/google/snappy/blob/master/framing_format.txt
    498 		switch chunkType {
    499 		case chunkTypeCompressedData:
    500 			r.blockStart += int64(r.j)
    501 			// Section 4.2. Compressed data (chunk type 0x00).
    502 			if chunkLen < checksumSize {
    503 				r.err = ErrCorrupt
    504 				return 0, r.err
    505 			}
    506 			if chunkLen > r.maxBufSize {
    507 				r.err = ErrCorrupt
    508 				return 0, r.err
    509 			}
    510 			orgBuf := <-toRead
    511 			buf := orgBuf[:chunkLen]
    512 
    513 			if !r.readFull(buf, false) {
    514 				return 0, r.err
    515 			}
    516 
    517 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    518 			buf = buf[checksumSize:]
    519 
    520 			n, err := DecodedLen(buf)
    521 			if err != nil {
    522 				r.err = err
    523 				return 0, r.err
    524 			}
    525 			if r.snappyFrame && n > maxSnappyBlockSize {
    526 				r.err = ErrCorrupt
    527 				return 0, r.err
    528 			}
    529 
    530 			if n > r.maxBlock {
    531 				r.err = ErrCorrupt
    532 				return 0, r.err
    533 			}
    534 			wg.Add(1)
    535 
    536 			decoded := <-writtenBlocks
    537 			entry := <-reUse
    538 			queue <- entry
    539 			go func() {
    540 				defer wg.Done()
    541 				decoded = decoded[:n]
    542 				_, err := Decode(decoded, buf)
    543 				toRead <- orgBuf
    544 				if err != nil {
    545 					writtenBlocks <- decoded
    546 					setErr(err)
    547 					return
    548 				}
    549 				if !r.ignoreCRC && crc(decoded) != checksum {
    550 					writtenBlocks <- decoded
    551 					setErr(ErrCRC)
    552 					return
    553 				}
    554 				entry <- decoded
    555 			}()
    556 			continue
    557 
    558 		case chunkTypeUncompressedData:
    559 
    560 			// Section 4.3. Uncompressed data (chunk type 0x01).
    561 			if chunkLen < checksumSize {
    562 				r.err = ErrCorrupt
    563 				return 0, r.err
    564 			}
    565 			if chunkLen > r.maxBufSize {
    566 				r.err = ErrCorrupt
    567 				return 0, r.err
    568 			}
    569 			// Grab write buffer
    570 			orgBuf := <-writtenBlocks
    571 			buf := orgBuf[:checksumSize]
    572 			if !r.readFull(buf, false) {
    573 				return 0, r.err
    574 			}
    575 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    576 			// Read content.
    577 			n := chunkLen - checksumSize
    578 
    579 			if r.snappyFrame && n > maxSnappyBlockSize {
    580 				r.err = ErrCorrupt
    581 				return 0, r.err
    582 			}
    583 			if n > r.maxBlock {
    584 				r.err = ErrCorrupt
    585 				return 0, r.err
    586 			}
    587 			// Read uncompressed
    588 			buf = orgBuf[:n]
    589 			if !r.readFull(buf, false) {
    590 				return 0, r.err
    591 			}
    592 
    593 			if !r.ignoreCRC && crc(buf) != checksum {
    594 				r.err = ErrCRC
    595 				return 0, r.err
    596 			}
    597 			entry := <-reUse
    598 			queue <- entry
    599 			entry <- buf
    600 			continue
    601 
    602 		case chunkTypeStreamIdentifier:
    603 			// Section 4.1. Stream identifier (chunk type 0xff).
    604 			if chunkLen != len(magicBody) {
    605 				r.err = ErrCorrupt
    606 				return 0, r.err
    607 			}
    608 			if !r.readFull(r.buf[:len(magicBody)], false) {
    609 				return 0, r.err
    610 			}
    611 			if string(r.buf[:len(magicBody)]) != magicBody {
    612 				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
    613 					r.err = ErrCorrupt
    614 					return 0, r.err
    615 				} else {
    616 					r.snappyFrame = true
    617 				}
    618 			} else {
    619 				r.snappyFrame = false
    620 			}
    621 			continue
    622 		}
    623 
    624 		if chunkType <= 0x7f {
    625 			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
    626 			// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
    627 			r.err = ErrUnsupported
    628 			return 0, r.err
    629 		}
    630 		// Section 4.4 Padding (chunk type 0xfe).
    631 		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
    632 		if chunkLen > maxChunkSize {
    633 			// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
    634 			r.err = ErrUnsupported
    635 			return 0, r.err
    636 		}
    637 
    638 		// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
    639 		if !r.skippable(r.buf, chunkLen, false, chunkType) {
    640 			return 0, r.err
    641 		}
    642 	}
    643 	return 0, r.err
    644 }
    645 
    646 // Skip will skip n bytes forward in the decompressed output.
    647 // For larger skips this consumes less CPU and is faster than reading output and discarding it.
    648 // CRC is not checked on skipped blocks.
    649 // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
    650 // If a decoding error is encountered subsequent calls to Read will also fail.
    651 func (r *Reader) Skip(n int64) error {
    652 	if n < 0 {
    653 		return errors.New("attempted negative skip")
    654 	}
    655 	if r.err != nil {
    656 		return r.err
    657 	}
    658 
    659 	for n > 0 {
    660 		if r.i < r.j {
    661 			// Skip in buffer.
    662 			// decoded[i:j] contains decoded bytes that have not yet been passed on.
    663 			left := int64(r.j - r.i)
    664 			if left >= n {
    665 				tmp := int64(r.i) + n
    666 				if tmp > math.MaxInt32 {
    667 					return errors.New("s2: internal overflow in skip")
    668 				}
    669 				r.i = int(tmp)
    670 				return nil
    671 			}
    672 			n -= int64(r.j - r.i)
    673 			r.i = r.j
    674 		}
    675 
    676 		// Buffer empty; read blocks until we have content.
    677 		if !r.readFull(r.buf[:4], true) {
    678 			if r.err == io.EOF {
    679 				r.err = io.ErrUnexpectedEOF
    680 			}
    681 			return r.err
    682 		}
    683 		chunkType := r.buf[0]
    684 		if !r.readHeader {
    685 			if chunkType != chunkTypeStreamIdentifier {
    686 				r.err = ErrCorrupt
    687 				return r.err
    688 			}
    689 			r.readHeader = true
    690 		}
    691 		chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
    692 
    693 		// The chunk types are specified at
    694 		// https://github.com/google/snappy/blob/master/framing_format.txt
    695 		switch chunkType {
    696 		case chunkTypeCompressedData:
    697 			r.blockStart += int64(r.j)
    698 			// Section 4.2. Compressed data (chunk type 0x00).
    699 			if chunkLen < checksumSize {
    700 				r.err = ErrCorrupt
    701 				return r.err
    702 			}
    703 			if !r.ensureBufferSize(chunkLen) {
    704 				if r.err == nil {
    705 					r.err = ErrUnsupported
    706 				}
    707 				return r.err
    708 			}
    709 			buf := r.buf[:chunkLen]
    710 			if !r.readFull(buf, false) {
    711 				return r.err
    712 			}
    713 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    714 			buf = buf[checksumSize:]
    715 
    716 			dLen, err := DecodedLen(buf)
    717 			if err != nil {
    718 				r.err = err
    719 				return r.err
    720 			}
    721 			if dLen > r.maxBlock {
    722 				r.err = ErrCorrupt
    723 				return r.err
    724 			}
    725 			// Check if destination is within this block
    726 			if int64(dLen) > n {
    727 				if len(r.decoded) < dLen {
    728 					r.decoded = make([]byte, dLen)
    729 				}
    730 				if _, err := Decode(r.decoded, buf); err != nil {
    731 					r.err = err
    732 					return r.err
    733 				}
    734 				if crc(r.decoded[:dLen]) != checksum {
    735 					r.err = ErrCorrupt
    736 					return r.err
    737 				}
    738 			} else {
    739 				// Skip block completely
    740 				n -= int64(dLen)
    741 				r.blockStart += int64(dLen)
    742 				dLen = 0
    743 			}
    744 			r.i, r.j = 0, dLen
    745 			continue
    746 		case chunkTypeUncompressedData:
    747 			r.blockStart += int64(r.j)
    748 			// Section 4.3. Uncompressed data (chunk type 0x01).
    749 			if chunkLen < checksumSize {
    750 				r.err = ErrCorrupt
    751 				return r.err
    752 			}
    753 			if !r.ensureBufferSize(chunkLen) {
    754 				if r.err != nil {
    755 					r.err = ErrUnsupported
    756 				}
    757 				return r.err
    758 			}
    759 			buf := r.buf[:checksumSize]
    760 			if !r.readFull(buf, false) {
    761 				return r.err
    762 			}
    763 			checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
    764 			// Read directly into r.decoded instead of via r.buf.
    765 			n2 := chunkLen - checksumSize
    766 			if n2 > len(r.decoded) {
    767 				if n2 > r.maxBlock {
    768 					r.err = ErrCorrupt
    769 					return r.err
    770 				}
    771 				r.decoded = make([]byte, n2)
    772 			}
    773 			if !r.readFull(r.decoded[:n2], false) {
    774 				return r.err
    775 			}
    776 			if int64(n2) < n {
    777 				if crc(r.decoded[:n2]) != checksum {
    778 					r.err = ErrCorrupt
    779 					return r.err
    780 				}
    781 			}
    782 			r.i, r.j = 0, n2
    783 			continue
    784 		case chunkTypeStreamIdentifier:
    785 			// Section 4.1. Stream identifier (chunk type 0xff).
    786 			if chunkLen != len(magicBody) {
    787 				r.err = ErrCorrupt
    788 				return r.err
    789 			}
    790 			if !r.readFull(r.buf[:len(magicBody)], false) {
    791 				return r.err
    792 			}
    793 			if string(r.buf[:len(magicBody)]) != magicBody {
    794 				if string(r.buf[:len(magicBody)]) != magicBodySnappy {
    795 					r.err = ErrCorrupt
    796 					return r.err
    797 				}
    798 			}
    799 
    800 			continue
    801 		}
    802 
    803 		if chunkType <= 0x7f {
    804 			// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
    805 			r.err = ErrUnsupported
    806 			return r.err
    807 		}
    808 		if chunkLen > maxChunkSize {
    809 			r.err = ErrUnsupported
    810 			return r.err
    811 		}
    812 		// Section 4.4 Padding (chunk type 0xfe).
    813 		// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
    814 		if !r.skippable(r.buf, chunkLen, false, chunkType) {
    815 			return r.err
    816 		}
    817 	}
    818 	return nil
    819 }
    820 
    821 // ReadSeeker provides random or forward seeking in compressed content.
    822 // See Reader.ReadSeeker
    823 type ReadSeeker struct {
    824 	*Reader
    825 	readAtMu sync.Mutex
    826 }
    827 
    828 // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
    829 // compatible version of the reader.
    830 // If 'random' is specified the returned io.Seeker can be used for
    831 // random seeking, otherwise only forward seeking is supported.
    832 // Enabling random seeking requires the original input to support
    833 // the io.Seeker interface.
    834 // A custom index can be specified which will be used if supplied.
    835 // When using a custom index, it will not be read from the input stream.
    836 // The ReadAt position will affect regular reads and the current position of Seek.
    837 // So using Read after ReadAt will continue from where the ReadAt stopped.
    838 // No functions should be used concurrently.
    839 // The returned ReadSeeker contains a shallow reference to the existing Reader,
    840 // meaning changes performed to one is reflected in the other.
    841 func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
    842 	// Read index if provided.
    843 	if len(index) != 0 {
    844 		if r.index == nil {
    845 			r.index = &Index{}
    846 		}
    847 		if _, err := r.index.Load(index); err != nil {
    848 			return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
    849 		}
    850 	}
    851 
    852 	// Check if input is seekable
    853 	rs, ok := r.r.(io.ReadSeeker)
    854 	if !ok {
    855 		if !random {
    856 			return &ReadSeeker{Reader: r}, nil
    857 		}
    858 		return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
    859 	}
    860 
    861 	if r.index != nil {
    862 		// Seekable and index, ok...
    863 		return &ReadSeeker{Reader: r}, nil
    864 	}
    865 
    866 	// Load from stream.
    867 	r.index = &Index{}
    868 
    869 	// Read current position.
    870 	pos, err := rs.Seek(0, io.SeekCurrent)
    871 	if err != nil {
    872 		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
    873 	}
    874 	err = r.index.LoadStream(rs)
    875 	if err != nil {
    876 		if err == ErrUnsupported {
    877 			// If we don't require random seeking, reset input and return.
    878 			if !random {
    879 				_, err = rs.Seek(pos, io.SeekStart)
    880 				if err != nil {
    881 					return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
    882 				}
    883 				r.index = nil
    884 				return &ReadSeeker{Reader: r}, nil
    885 			}
    886 			return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
    887 		}
    888 		return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
    889 	}
    890 
    891 	// reset position.
    892 	_, err = rs.Seek(pos, io.SeekStart)
    893 	if err != nil {
    894 		return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
    895 	}
    896 	return &ReadSeeker{Reader: r}, nil
    897 }
    898 
    899 // Seek allows seeking in compressed data.
    900 func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
    901 	if r.err != nil {
    902 		if !errors.Is(r.err, io.EOF) {
    903 			return 0, r.err
    904 		}
    905 		// Reset on EOF
    906 		r.err = nil
    907 	}
    908 
    909 	// Calculate absolute offset.
    910 	absOffset := offset
    911 
    912 	switch whence {
    913 	case io.SeekStart:
    914 	case io.SeekCurrent:
    915 		absOffset = r.blockStart + int64(r.i) + offset
    916 	case io.SeekEnd:
    917 		if r.index == nil {
    918 			return 0, ErrUnsupported
    919 		}
    920 		absOffset = r.index.TotalUncompressed + offset
    921 	default:
    922 		r.err = ErrUnsupported
    923 		return 0, r.err
    924 	}
    925 
    926 	if absOffset < 0 {
    927 		return 0, errors.New("seek before start of file")
    928 	}
    929 
    930 	if !r.readHeader {
    931 		// Make sure we read the header.
    932 		_, r.err = r.Read([]byte{})
    933 		if r.err != nil {
    934 			return 0, r.err
    935 		}
    936 	}
    937 
    938 	// If we are inside current block no need to seek.
    939 	// This includes no offset changes.
    940 	if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
    941 		r.i = int(absOffset - r.blockStart)
    942 		return r.blockStart + int64(r.i), nil
    943 	}
    944 
    945 	rs, ok := r.r.(io.ReadSeeker)
    946 	if r.index == nil || !ok {
    947 		currOffset := r.blockStart + int64(r.i)
    948 		if absOffset >= currOffset {
    949 			err := r.Skip(absOffset - currOffset)
    950 			return r.blockStart + int64(r.i), err
    951 		}
    952 		return 0, ErrUnsupported
    953 	}
    954 
    955 	// We can seek and we have an index.
    956 	c, u, err := r.index.Find(absOffset)
    957 	if err != nil {
    958 		return r.blockStart + int64(r.i), err
    959 	}
    960 
    961 	// Seek to next block
    962 	_, err = rs.Seek(c, io.SeekStart)
    963 	if err != nil {
    964 		return 0, err
    965 	}
    966 
    967 	r.i = r.j                     // Remove rest of current block.
    968 	r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
    969 	if u < absOffset {
    970 		// Forward inside block
    971 		return absOffset, r.Skip(absOffset - u)
    972 	}
    973 	if u > absOffset {
    974 		return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
    975 	}
    976 	return absOffset, nil
    977 }
    978 
    979 // ReadAt reads len(p) bytes into p starting at offset off in the
    980 // underlying input source. It returns the number of bytes
    981 // read (0 <= n <= len(p)) and any error encountered.
    982 //
    983 // When ReadAt returns n < len(p), it returns a non-nil error
    984 // explaining why more bytes were not returned. In this respect,
    985 // ReadAt is stricter than Read.
    986 //
    987 // Even if ReadAt returns n < len(p), it may use all of p as scratch
    988 // space during the call. If some data is available but not len(p) bytes,
    989 // ReadAt blocks until either all the data is available or an error occurs.
    990 // In this respect ReadAt is different from Read.
    991 //
    992 // If the n = len(p) bytes returned by ReadAt are at the end of the
    993 // input source, ReadAt may return either err == EOF or err == nil.
    994 //
    995 // If ReadAt is reading from an input source with a seek offset,
    996 // ReadAt should not affect nor be affected by the underlying
    997 // seek offset.
    998 //
    999 // Clients of ReadAt can execute parallel ReadAt calls on the
   1000 // same input source. This is however not recommended.
   1001 func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
   1002 	r.readAtMu.Lock()
   1003 	defer r.readAtMu.Unlock()
   1004 	_, err := r.Seek(offset, io.SeekStart)
   1005 	if err != nil {
   1006 		return 0, err
   1007 	}
   1008 	n := 0
   1009 	for n < len(p) {
   1010 		n2, err := r.Read(p[n:])
   1011 		if err != nil {
   1012 			// This will include io.EOF
   1013 			return n + n2, err
   1014 		}
   1015 		n += n2
   1016 	}
   1017 	return n, nil
   1018 }
   1019 
   1020 // ReadByte satisfies the io.ByteReader interface.
   1021 func (r *Reader) ReadByte() (byte, error) {
   1022 	if r.err != nil {
   1023 		return 0, r.err
   1024 	}
   1025 	if r.i < r.j {
   1026 		c := r.decoded[r.i]
   1027 		r.i++
   1028 		return c, nil
   1029 	}
   1030 	var tmp [1]byte
   1031 	for i := 0; i < 10; i++ {
   1032 		n, err := r.Read(tmp[:])
   1033 		if err != nil {
   1034 			return 0, err
   1035 		}
   1036 		if n == 1 {
   1037 			return tmp[0], nil
   1038 		}
   1039 	}
   1040 	return 0, io.ErrNoProgress
   1041 }
   1042 
   1043 // SkippableCB will register a callback for chunks with the specified ID.
   1044 // ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
   1045 // For each chunk with the ID, the callback is called with the content.
   1046 // Any returned non-nil error will abort decompression.
   1047 // Only one callback per ID is supported, latest sent will be used.
   1048 // Sending a nil function will disable previous callbacks.
   1049 func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
   1050 	if id < 0x80 || id > chunkTypePadding {
   1051 		return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
   1052 	}
   1053 	r.skippableCB[id] = fn
   1054 	return nil
   1055 }