reader.go (27028B)
1 // Copyright 2011 The Snappy-Go Authors. All rights reserved. 2 // Copyright (c) 2019+ Klaus Post. All rights reserved. 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 package s2 7 8 import ( 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "math" 14 "runtime" 15 "sync" 16 ) 17 18 // ErrCantSeek is returned if the stream cannot be seeked. 19 type ErrCantSeek struct { 20 Reason string 21 } 22 23 // Error returns the error as string. 24 func (e ErrCantSeek) Error() string { 25 return fmt.Sprintf("s2: Can't seek because %s", e.Reason) 26 } 27 28 // NewReader returns a new Reader that decompresses from r, using the framing 29 // format described at 30 // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes. 31 func NewReader(r io.Reader, opts ...ReaderOption) *Reader { 32 nr := Reader{ 33 r: r, 34 maxBlock: maxBlockSize, 35 } 36 for _, opt := range opts { 37 if err := opt(&nr); err != nil { 38 nr.err = err 39 return &nr 40 } 41 } 42 nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize 43 if nr.lazyBuf > 0 { 44 nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize) 45 } else { 46 nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) 47 } 48 nr.readHeader = nr.ignoreStreamID 49 nr.paramsOK = true 50 return &nr 51 } 52 53 // ReaderOption is an option for creating a decoder. 54 type ReaderOption func(*Reader) error 55 56 // ReaderMaxBlockSize allows to control allocations if the stream 57 // has been compressed with a smaller WriterBlockSize, or with the default 1MB. 58 // Blocks must be this size or smaller to decompress, 59 // otherwise the decoder will return ErrUnsupported. 60 // 61 // For streams compressed with Snappy this can safely be set to 64KB (64 << 10). 62 // 63 // Default is the maximum limit of 4MB. 64 func ReaderMaxBlockSize(blockSize int) ReaderOption { 65 return func(r *Reader) error { 66 if blockSize > maxBlockSize || blockSize <= 0 { 67 return errors.New("s2: block size too large. Must be <= 4MB and > 0") 68 } 69 if r.lazyBuf == 0 && blockSize < defaultBlockSize { 70 r.lazyBuf = blockSize 71 } 72 r.maxBlock = blockSize 73 return nil 74 } 75 } 76 77 // ReaderAllocBlock allows to control upfront stream allocations 78 // and not allocate for frames bigger than this initially. 79 // If frames bigger than this is seen a bigger buffer will be allocated. 80 // 81 // Default is 1MB, which is default output size. 82 func ReaderAllocBlock(blockSize int) ReaderOption { 83 return func(r *Reader) error { 84 if blockSize > maxBlockSize || blockSize < 1024 { 85 return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024") 86 } 87 r.lazyBuf = blockSize 88 return nil 89 } 90 } 91 92 // ReaderIgnoreStreamIdentifier will make the reader skip the expected 93 // stream identifier at the beginning of the stream. 94 // This can be used when serving a stream that has been forwarded to a specific point. 95 func ReaderIgnoreStreamIdentifier() ReaderOption { 96 return func(r *Reader) error { 97 r.ignoreStreamID = true 98 return nil 99 } 100 } 101 102 // ReaderSkippableCB will register a callback for chuncks with the specified ID. 103 // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). 104 // For each chunk with the ID, the callback is called with the content. 105 // Any returned non-nil error will abort decompression. 106 // Only one callback per ID is supported, latest sent will be used. 107 func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { 108 return func(r *Reader) error { 109 if id < 0x80 || id > 0xfd { 110 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") 111 } 112 r.skippableCB[id] = fn 113 return nil 114 } 115 } 116 117 // ReaderIgnoreCRC will make the reader skip CRC calculation and checks. 118 func ReaderIgnoreCRC() ReaderOption { 119 return func(r *Reader) error { 120 r.ignoreCRC = true 121 return nil 122 } 123 } 124 125 // Reader is an io.Reader that can read Snappy-compressed bytes. 126 type Reader struct { 127 r io.Reader 128 err error 129 decoded []byte 130 buf []byte 131 skippableCB [0x80]func(r io.Reader) error 132 blockStart int64 // Uncompressed offset at start of current. 133 index *Index 134 135 // decoded[i:j] contains decoded bytes that have not yet been passed on. 136 i, j int 137 // maximum block size allowed. 138 maxBlock int 139 // maximum expected buffer size. 140 maxBufSize int 141 // alloc a buffer this size if > 0. 142 lazyBuf int 143 readHeader bool 144 paramsOK bool 145 snappyFrame bool 146 ignoreStreamID bool 147 ignoreCRC bool 148 } 149 150 // ensureBufferSize will ensure that the buffer can take at least n bytes. 151 // If false is returned the buffer exceeds maximum allowed size. 152 func (r *Reader) ensureBufferSize(n int) bool { 153 if n > r.maxBufSize { 154 r.err = ErrCorrupt 155 return false 156 } 157 if cap(r.buf) >= n { 158 return true 159 } 160 // Realloc buffer. 161 r.buf = make([]byte, n) 162 return true 163 } 164 165 // Reset discards any buffered data, resets all state, and switches the Snappy 166 // reader to read from r. This permits reusing a Reader rather than allocating 167 // a new one. 168 func (r *Reader) Reset(reader io.Reader) { 169 if !r.paramsOK { 170 return 171 } 172 r.index = nil 173 r.r = reader 174 r.err = nil 175 r.i = 0 176 r.j = 0 177 r.blockStart = 0 178 r.readHeader = r.ignoreStreamID 179 } 180 181 func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { 182 if _, r.err = io.ReadFull(r.r, p); r.err != nil { 183 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { 184 r.err = ErrCorrupt 185 } 186 return false 187 } 188 return true 189 } 190 191 // skippable will skip n bytes. 192 // If the supplied reader supports seeking that is used. 193 // tmp is used as a temporary buffer for reading. 194 // The supplied slice does not need to be the size of the read. 195 func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { 196 if id < 0x80 { 197 r.err = fmt.Errorf("interbal error: skippable id < 0x80") 198 return false 199 } 200 if fn := r.skippableCB[id-0x80]; fn != nil { 201 rd := io.LimitReader(r.r, int64(n)) 202 r.err = fn(rd) 203 if r.err != nil { 204 return false 205 } 206 _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp) 207 return r.err == nil 208 } 209 if rs, ok := r.r.(io.ReadSeeker); ok { 210 _, err := rs.Seek(int64(n), io.SeekCurrent) 211 if err == nil { 212 return true 213 } 214 if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { 215 r.err = ErrCorrupt 216 return false 217 } 218 } 219 for n > 0 { 220 if n < len(tmp) { 221 tmp = tmp[:n] 222 } 223 if _, r.err = io.ReadFull(r.r, tmp); r.err != nil { 224 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { 225 r.err = ErrCorrupt 226 } 227 return false 228 } 229 n -= len(tmp) 230 } 231 return true 232 } 233 234 // Read satisfies the io.Reader interface. 235 func (r *Reader) Read(p []byte) (int, error) { 236 if r.err != nil { 237 return 0, r.err 238 } 239 for { 240 if r.i < r.j { 241 n := copy(p, r.decoded[r.i:r.j]) 242 r.i += n 243 return n, nil 244 } 245 if !r.readFull(r.buf[:4], true) { 246 return 0, r.err 247 } 248 chunkType := r.buf[0] 249 if !r.readHeader { 250 if chunkType != chunkTypeStreamIdentifier { 251 r.err = ErrCorrupt 252 return 0, r.err 253 } 254 r.readHeader = true 255 } 256 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 257 258 // The chunk types are specified at 259 // https://github.com/google/snappy/blob/master/framing_format.txt 260 switch chunkType { 261 case chunkTypeCompressedData: 262 r.blockStart += int64(r.j) 263 // Section 4.2. Compressed data (chunk type 0x00). 264 if chunkLen < checksumSize { 265 r.err = ErrCorrupt 266 return 0, r.err 267 } 268 if !r.ensureBufferSize(chunkLen) { 269 if r.err == nil { 270 r.err = ErrUnsupported 271 } 272 return 0, r.err 273 } 274 buf := r.buf[:chunkLen] 275 if !r.readFull(buf, false) { 276 return 0, r.err 277 } 278 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 279 buf = buf[checksumSize:] 280 281 n, err := DecodedLen(buf) 282 if err != nil { 283 r.err = err 284 return 0, r.err 285 } 286 if r.snappyFrame && n > maxSnappyBlockSize { 287 r.err = ErrCorrupt 288 return 0, r.err 289 } 290 291 if n > len(r.decoded) { 292 if n > r.maxBlock { 293 r.err = ErrCorrupt 294 return 0, r.err 295 } 296 r.decoded = make([]byte, n) 297 } 298 if _, err := Decode(r.decoded, buf); err != nil { 299 r.err = err 300 return 0, r.err 301 } 302 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum { 303 r.err = ErrCRC 304 return 0, r.err 305 } 306 r.i, r.j = 0, n 307 continue 308 309 case chunkTypeUncompressedData: 310 r.blockStart += int64(r.j) 311 // Section 4.3. Uncompressed data (chunk type 0x01). 312 if chunkLen < checksumSize { 313 r.err = ErrCorrupt 314 return 0, r.err 315 } 316 if !r.ensureBufferSize(chunkLen) { 317 if r.err == nil { 318 r.err = ErrUnsupported 319 } 320 return 0, r.err 321 } 322 buf := r.buf[:checksumSize] 323 if !r.readFull(buf, false) { 324 return 0, r.err 325 } 326 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 327 // Read directly into r.decoded instead of via r.buf. 328 n := chunkLen - checksumSize 329 if r.snappyFrame && n > maxSnappyBlockSize { 330 r.err = ErrCorrupt 331 return 0, r.err 332 } 333 if n > len(r.decoded) { 334 if n > r.maxBlock { 335 r.err = ErrCorrupt 336 return 0, r.err 337 } 338 r.decoded = make([]byte, n) 339 } 340 if !r.readFull(r.decoded[:n], false) { 341 return 0, r.err 342 } 343 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum { 344 r.err = ErrCRC 345 return 0, r.err 346 } 347 r.i, r.j = 0, n 348 continue 349 350 case chunkTypeStreamIdentifier: 351 // Section 4.1. Stream identifier (chunk type 0xff). 352 if chunkLen != len(magicBody) { 353 r.err = ErrCorrupt 354 return 0, r.err 355 } 356 if !r.readFull(r.buf[:len(magicBody)], false) { 357 return 0, r.err 358 } 359 if string(r.buf[:len(magicBody)]) != magicBody { 360 if string(r.buf[:len(magicBody)]) != magicBodySnappy { 361 r.err = ErrCorrupt 362 return 0, r.err 363 } else { 364 r.snappyFrame = true 365 } 366 } else { 367 r.snappyFrame = false 368 } 369 continue 370 } 371 372 if chunkType <= 0x7f { 373 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). 374 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) 375 r.err = ErrUnsupported 376 return 0, r.err 377 } 378 // Section 4.4 Padding (chunk type 0xfe). 379 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). 380 if chunkLen > maxChunkSize { 381 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) 382 r.err = ErrUnsupported 383 return 0, r.err 384 } 385 386 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) 387 if !r.skippable(r.buf, chunkLen, false, chunkType) { 388 return 0, r.err 389 } 390 } 391 } 392 393 // DecodeConcurrent will decode the full stream to w. 394 // This function should not be combined with reading, seeking or other operations. 395 // Up to 'concurrent' goroutines will be used. 396 // If <= 0, runtime.NumCPU will be used. 397 // On success the number of bytes decompressed nil and is returned. 398 // This is mainly intended for bigger streams. 399 func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) { 400 if r.i > 0 || r.j > 0 || r.blockStart > 0 { 401 return 0, errors.New("DecodeConcurrent called after ") 402 } 403 if concurrent <= 0 { 404 concurrent = runtime.NumCPU() 405 } 406 407 // Write to output 408 var errMu sync.Mutex 409 var aErr error 410 setErr := func(e error) (ok bool) { 411 errMu.Lock() 412 defer errMu.Unlock() 413 if e == nil { 414 return aErr == nil 415 } 416 if aErr == nil { 417 aErr = e 418 } 419 return false 420 } 421 hasErr := func() (ok bool) { 422 errMu.Lock() 423 v := aErr != nil 424 errMu.Unlock() 425 return v 426 } 427 428 var aWritten int64 429 toRead := make(chan []byte, concurrent) 430 writtenBlocks := make(chan []byte, concurrent) 431 queue := make(chan chan []byte, concurrent) 432 reUse := make(chan chan []byte, concurrent) 433 for i := 0; i < concurrent; i++ { 434 toRead <- make([]byte, 0, r.maxBufSize) 435 writtenBlocks <- make([]byte, 0, r.maxBufSize) 436 reUse <- make(chan []byte, 1) 437 } 438 // Writer 439 var wg sync.WaitGroup 440 wg.Add(1) 441 go func() { 442 defer wg.Done() 443 for toWrite := range queue { 444 entry := <-toWrite 445 reUse <- toWrite 446 if hasErr() { 447 writtenBlocks <- entry 448 continue 449 } 450 n, err := w.Write(entry) 451 want := len(entry) 452 writtenBlocks <- entry 453 if err != nil { 454 setErr(err) 455 continue 456 } 457 if n != want { 458 setErr(io.ErrShortWrite) 459 continue 460 } 461 aWritten += int64(n) 462 } 463 }() 464 465 // Reader 466 defer func() { 467 close(queue) 468 if r.err != nil { 469 err = r.err 470 setErr(r.err) 471 } 472 wg.Wait() 473 if err == nil { 474 err = aErr 475 } 476 written = aWritten 477 }() 478 479 for !hasErr() { 480 if !r.readFull(r.buf[:4], true) { 481 if r.err == io.EOF { 482 r.err = nil 483 } 484 return 0, r.err 485 } 486 chunkType := r.buf[0] 487 if !r.readHeader { 488 if chunkType != chunkTypeStreamIdentifier { 489 r.err = ErrCorrupt 490 return 0, r.err 491 } 492 r.readHeader = true 493 } 494 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 495 496 // The chunk types are specified at 497 // https://github.com/google/snappy/blob/master/framing_format.txt 498 switch chunkType { 499 case chunkTypeCompressedData: 500 r.blockStart += int64(r.j) 501 // Section 4.2. Compressed data (chunk type 0x00). 502 if chunkLen < checksumSize { 503 r.err = ErrCorrupt 504 return 0, r.err 505 } 506 if chunkLen > r.maxBufSize { 507 r.err = ErrCorrupt 508 return 0, r.err 509 } 510 orgBuf := <-toRead 511 buf := orgBuf[:chunkLen] 512 513 if !r.readFull(buf, false) { 514 return 0, r.err 515 } 516 517 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 518 buf = buf[checksumSize:] 519 520 n, err := DecodedLen(buf) 521 if err != nil { 522 r.err = err 523 return 0, r.err 524 } 525 if r.snappyFrame && n > maxSnappyBlockSize { 526 r.err = ErrCorrupt 527 return 0, r.err 528 } 529 530 if n > r.maxBlock { 531 r.err = ErrCorrupt 532 return 0, r.err 533 } 534 wg.Add(1) 535 536 decoded := <-writtenBlocks 537 entry := <-reUse 538 queue <- entry 539 go func() { 540 defer wg.Done() 541 decoded = decoded[:n] 542 _, err := Decode(decoded, buf) 543 toRead <- orgBuf 544 if err != nil { 545 writtenBlocks <- decoded 546 setErr(err) 547 return 548 } 549 if !r.ignoreCRC && crc(decoded) != checksum { 550 writtenBlocks <- decoded 551 setErr(ErrCRC) 552 return 553 } 554 entry <- decoded 555 }() 556 continue 557 558 case chunkTypeUncompressedData: 559 560 // Section 4.3. Uncompressed data (chunk type 0x01). 561 if chunkLen < checksumSize { 562 r.err = ErrCorrupt 563 return 0, r.err 564 } 565 if chunkLen > r.maxBufSize { 566 r.err = ErrCorrupt 567 return 0, r.err 568 } 569 // Grab write buffer 570 orgBuf := <-writtenBlocks 571 buf := orgBuf[:checksumSize] 572 if !r.readFull(buf, false) { 573 return 0, r.err 574 } 575 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 576 // Read content. 577 n := chunkLen - checksumSize 578 579 if r.snappyFrame && n > maxSnappyBlockSize { 580 r.err = ErrCorrupt 581 return 0, r.err 582 } 583 if n > r.maxBlock { 584 r.err = ErrCorrupt 585 return 0, r.err 586 } 587 // Read uncompressed 588 buf = orgBuf[:n] 589 if !r.readFull(buf, false) { 590 return 0, r.err 591 } 592 593 if !r.ignoreCRC && crc(buf) != checksum { 594 r.err = ErrCRC 595 return 0, r.err 596 } 597 entry := <-reUse 598 queue <- entry 599 entry <- buf 600 continue 601 602 case chunkTypeStreamIdentifier: 603 // Section 4.1. Stream identifier (chunk type 0xff). 604 if chunkLen != len(magicBody) { 605 r.err = ErrCorrupt 606 return 0, r.err 607 } 608 if !r.readFull(r.buf[:len(magicBody)], false) { 609 return 0, r.err 610 } 611 if string(r.buf[:len(magicBody)]) != magicBody { 612 if string(r.buf[:len(magicBody)]) != magicBodySnappy { 613 r.err = ErrCorrupt 614 return 0, r.err 615 } else { 616 r.snappyFrame = true 617 } 618 } else { 619 r.snappyFrame = false 620 } 621 continue 622 } 623 624 if chunkType <= 0x7f { 625 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). 626 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) 627 r.err = ErrUnsupported 628 return 0, r.err 629 } 630 // Section 4.4 Padding (chunk type 0xfe). 631 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). 632 if chunkLen > maxChunkSize { 633 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) 634 r.err = ErrUnsupported 635 return 0, r.err 636 } 637 638 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) 639 if !r.skippable(r.buf, chunkLen, false, chunkType) { 640 return 0, r.err 641 } 642 } 643 return 0, r.err 644 } 645 646 // Skip will skip n bytes forward in the decompressed output. 647 // For larger skips this consumes less CPU and is faster than reading output and discarding it. 648 // CRC is not checked on skipped blocks. 649 // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped. 650 // If a decoding error is encountered subsequent calls to Read will also fail. 651 func (r *Reader) Skip(n int64) error { 652 if n < 0 { 653 return errors.New("attempted negative skip") 654 } 655 if r.err != nil { 656 return r.err 657 } 658 659 for n > 0 { 660 if r.i < r.j { 661 // Skip in buffer. 662 // decoded[i:j] contains decoded bytes that have not yet been passed on. 663 left := int64(r.j - r.i) 664 if left >= n { 665 tmp := int64(r.i) + n 666 if tmp > math.MaxInt32 { 667 return errors.New("s2: internal overflow in skip") 668 } 669 r.i = int(tmp) 670 return nil 671 } 672 n -= int64(r.j - r.i) 673 r.i = r.j 674 } 675 676 // Buffer empty; read blocks until we have content. 677 if !r.readFull(r.buf[:4], true) { 678 if r.err == io.EOF { 679 r.err = io.ErrUnexpectedEOF 680 } 681 return r.err 682 } 683 chunkType := r.buf[0] 684 if !r.readHeader { 685 if chunkType != chunkTypeStreamIdentifier { 686 r.err = ErrCorrupt 687 return r.err 688 } 689 r.readHeader = true 690 } 691 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 692 693 // The chunk types are specified at 694 // https://github.com/google/snappy/blob/master/framing_format.txt 695 switch chunkType { 696 case chunkTypeCompressedData: 697 r.blockStart += int64(r.j) 698 // Section 4.2. Compressed data (chunk type 0x00). 699 if chunkLen < checksumSize { 700 r.err = ErrCorrupt 701 return r.err 702 } 703 if !r.ensureBufferSize(chunkLen) { 704 if r.err == nil { 705 r.err = ErrUnsupported 706 } 707 return r.err 708 } 709 buf := r.buf[:chunkLen] 710 if !r.readFull(buf, false) { 711 return r.err 712 } 713 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 714 buf = buf[checksumSize:] 715 716 dLen, err := DecodedLen(buf) 717 if err != nil { 718 r.err = err 719 return r.err 720 } 721 if dLen > r.maxBlock { 722 r.err = ErrCorrupt 723 return r.err 724 } 725 // Check if destination is within this block 726 if int64(dLen) > n { 727 if len(r.decoded) < dLen { 728 r.decoded = make([]byte, dLen) 729 } 730 if _, err := Decode(r.decoded, buf); err != nil { 731 r.err = err 732 return r.err 733 } 734 if crc(r.decoded[:dLen]) != checksum { 735 r.err = ErrCorrupt 736 return r.err 737 } 738 } else { 739 // Skip block completely 740 n -= int64(dLen) 741 r.blockStart += int64(dLen) 742 dLen = 0 743 } 744 r.i, r.j = 0, dLen 745 continue 746 case chunkTypeUncompressedData: 747 r.blockStart += int64(r.j) 748 // Section 4.3. Uncompressed data (chunk type 0x01). 749 if chunkLen < checksumSize { 750 r.err = ErrCorrupt 751 return r.err 752 } 753 if !r.ensureBufferSize(chunkLen) { 754 if r.err != nil { 755 r.err = ErrUnsupported 756 } 757 return r.err 758 } 759 buf := r.buf[:checksumSize] 760 if !r.readFull(buf, false) { 761 return r.err 762 } 763 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 764 // Read directly into r.decoded instead of via r.buf. 765 n2 := chunkLen - checksumSize 766 if n2 > len(r.decoded) { 767 if n2 > r.maxBlock { 768 r.err = ErrCorrupt 769 return r.err 770 } 771 r.decoded = make([]byte, n2) 772 } 773 if !r.readFull(r.decoded[:n2], false) { 774 return r.err 775 } 776 if int64(n2) < n { 777 if crc(r.decoded[:n2]) != checksum { 778 r.err = ErrCorrupt 779 return r.err 780 } 781 } 782 r.i, r.j = 0, n2 783 continue 784 case chunkTypeStreamIdentifier: 785 // Section 4.1. Stream identifier (chunk type 0xff). 786 if chunkLen != len(magicBody) { 787 r.err = ErrCorrupt 788 return r.err 789 } 790 if !r.readFull(r.buf[:len(magicBody)], false) { 791 return r.err 792 } 793 if string(r.buf[:len(magicBody)]) != magicBody { 794 if string(r.buf[:len(magicBody)]) != magicBodySnappy { 795 r.err = ErrCorrupt 796 return r.err 797 } 798 } 799 800 continue 801 } 802 803 if chunkType <= 0x7f { 804 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). 805 r.err = ErrUnsupported 806 return r.err 807 } 808 if chunkLen > maxChunkSize { 809 r.err = ErrUnsupported 810 return r.err 811 } 812 // Section 4.4 Padding (chunk type 0xfe). 813 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). 814 if !r.skippable(r.buf, chunkLen, false, chunkType) { 815 return r.err 816 } 817 } 818 return nil 819 } 820 821 // ReadSeeker provides random or forward seeking in compressed content. 822 // See Reader.ReadSeeker 823 type ReadSeeker struct { 824 *Reader 825 readAtMu sync.Mutex 826 } 827 828 // ReadSeeker will return an io.ReadSeeker and io.ReaderAt 829 // compatible version of the reader. 830 // If 'random' is specified the returned io.Seeker can be used for 831 // random seeking, otherwise only forward seeking is supported. 832 // Enabling random seeking requires the original input to support 833 // the io.Seeker interface. 834 // A custom index can be specified which will be used if supplied. 835 // When using a custom index, it will not be read from the input stream. 836 // The ReadAt position will affect regular reads and the current position of Seek. 837 // So using Read after ReadAt will continue from where the ReadAt stopped. 838 // No functions should be used concurrently. 839 // The returned ReadSeeker contains a shallow reference to the existing Reader, 840 // meaning changes performed to one is reflected in the other. 841 func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { 842 // Read index if provided. 843 if len(index) != 0 { 844 if r.index == nil { 845 r.index = &Index{} 846 } 847 if _, err := r.index.Load(index); err != nil { 848 return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()} 849 } 850 } 851 852 // Check if input is seekable 853 rs, ok := r.r.(io.ReadSeeker) 854 if !ok { 855 if !random { 856 return &ReadSeeker{Reader: r}, nil 857 } 858 return nil, ErrCantSeek{Reason: "input stream isn't seekable"} 859 } 860 861 if r.index != nil { 862 // Seekable and index, ok... 863 return &ReadSeeker{Reader: r}, nil 864 } 865 866 // Load from stream. 867 r.index = &Index{} 868 869 // Read current position. 870 pos, err := rs.Seek(0, io.SeekCurrent) 871 if err != nil { 872 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} 873 } 874 err = r.index.LoadStream(rs) 875 if err != nil { 876 if err == ErrUnsupported { 877 // If we don't require random seeking, reset input and return. 878 if !random { 879 _, err = rs.Seek(pos, io.SeekStart) 880 if err != nil { 881 return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()} 882 } 883 r.index = nil 884 return &ReadSeeker{Reader: r}, nil 885 } 886 return nil, ErrCantSeek{Reason: "input stream does not contain an index"} 887 } 888 return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()} 889 } 890 891 // reset position. 892 _, err = rs.Seek(pos, io.SeekStart) 893 if err != nil { 894 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} 895 } 896 return &ReadSeeker{Reader: r}, nil 897 } 898 899 // Seek allows seeking in compressed data. 900 func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { 901 if r.err != nil { 902 if !errors.Is(r.err, io.EOF) { 903 return 0, r.err 904 } 905 // Reset on EOF 906 r.err = nil 907 } 908 909 // Calculate absolute offset. 910 absOffset := offset 911 912 switch whence { 913 case io.SeekStart: 914 case io.SeekCurrent: 915 absOffset = r.blockStart + int64(r.i) + offset 916 case io.SeekEnd: 917 if r.index == nil { 918 return 0, ErrUnsupported 919 } 920 absOffset = r.index.TotalUncompressed + offset 921 default: 922 r.err = ErrUnsupported 923 return 0, r.err 924 } 925 926 if absOffset < 0 { 927 return 0, errors.New("seek before start of file") 928 } 929 930 if !r.readHeader { 931 // Make sure we read the header. 932 _, r.err = r.Read([]byte{}) 933 if r.err != nil { 934 return 0, r.err 935 } 936 } 937 938 // If we are inside current block no need to seek. 939 // This includes no offset changes. 940 if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) { 941 r.i = int(absOffset - r.blockStart) 942 return r.blockStart + int64(r.i), nil 943 } 944 945 rs, ok := r.r.(io.ReadSeeker) 946 if r.index == nil || !ok { 947 currOffset := r.blockStart + int64(r.i) 948 if absOffset >= currOffset { 949 err := r.Skip(absOffset - currOffset) 950 return r.blockStart + int64(r.i), err 951 } 952 return 0, ErrUnsupported 953 } 954 955 // We can seek and we have an index. 956 c, u, err := r.index.Find(absOffset) 957 if err != nil { 958 return r.blockStart + int64(r.i), err 959 } 960 961 // Seek to next block 962 _, err = rs.Seek(c, io.SeekStart) 963 if err != nil { 964 return 0, err 965 } 966 967 r.i = r.j // Remove rest of current block. 968 r.blockStart = u - int64(r.j) // Adjust current block start for accounting. 969 if u < absOffset { 970 // Forward inside block 971 return absOffset, r.Skip(absOffset - u) 972 } 973 if u > absOffset { 974 return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset) 975 } 976 return absOffset, nil 977 } 978 979 // ReadAt reads len(p) bytes into p starting at offset off in the 980 // underlying input source. It returns the number of bytes 981 // read (0 <= n <= len(p)) and any error encountered. 982 // 983 // When ReadAt returns n < len(p), it returns a non-nil error 984 // explaining why more bytes were not returned. In this respect, 985 // ReadAt is stricter than Read. 986 // 987 // Even if ReadAt returns n < len(p), it may use all of p as scratch 988 // space during the call. If some data is available but not len(p) bytes, 989 // ReadAt blocks until either all the data is available or an error occurs. 990 // In this respect ReadAt is different from Read. 991 // 992 // If the n = len(p) bytes returned by ReadAt are at the end of the 993 // input source, ReadAt may return either err == EOF or err == nil. 994 // 995 // If ReadAt is reading from an input source with a seek offset, 996 // ReadAt should not affect nor be affected by the underlying 997 // seek offset. 998 // 999 // Clients of ReadAt can execute parallel ReadAt calls on the 1000 // same input source. This is however not recommended. 1001 func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) { 1002 r.readAtMu.Lock() 1003 defer r.readAtMu.Unlock() 1004 _, err := r.Seek(offset, io.SeekStart) 1005 if err != nil { 1006 return 0, err 1007 } 1008 n := 0 1009 for n < len(p) { 1010 n2, err := r.Read(p[n:]) 1011 if err != nil { 1012 // This will include io.EOF 1013 return n + n2, err 1014 } 1015 n += n2 1016 } 1017 return n, nil 1018 } 1019 1020 // ReadByte satisfies the io.ByteReader interface. 1021 func (r *Reader) ReadByte() (byte, error) { 1022 if r.err != nil { 1023 return 0, r.err 1024 } 1025 if r.i < r.j { 1026 c := r.decoded[r.i] 1027 r.i++ 1028 return c, nil 1029 } 1030 var tmp [1]byte 1031 for i := 0; i < 10; i++ { 1032 n, err := r.Read(tmp[:]) 1033 if err != nil { 1034 return 0, err 1035 } 1036 if n == 1 { 1037 return tmp[0], nil 1038 } 1039 } 1040 return 0, io.ErrNoProgress 1041 } 1042 1043 // SkippableCB will register a callback for chunks with the specified ID. 1044 // ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). 1045 // For each chunk with the ID, the callback is called with the content. 1046 // Any returned non-nil error will abort decompression. 1047 // Only one callback per ID is supported, latest sent will be used. 1048 // Sending a nil function will disable previous callbacks. 1049 func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { 1050 if id < 0x80 || id > chunkTypePadding { 1051 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") 1052 } 1053 r.skippableCB[id] = fn 1054 return nil 1055 }