writer.go (26597B)
1 // Copyright 2011 The Snappy-Go Authors. All rights reserved. 2 // Copyright (c) 2019+ Klaus Post. All rights reserved. 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 package s2 7 8 import ( 9 "crypto/rand" 10 "encoding/binary" 11 "errors" 12 "fmt" 13 "io" 14 "runtime" 15 "sync" 16 ) 17 18 const ( 19 levelUncompressed = iota + 1 20 levelFast 21 levelBetter 22 levelBest 23 ) 24 25 // NewWriter returns a new Writer that compresses to w, using the 26 // framing format described at 27 // https://github.com/google/snappy/blob/master/framing_format.txt 28 // 29 // Users must call Close to guarantee all data has been forwarded to 30 // the underlying io.Writer and that resources are released. 31 // They may also call Flush zero or more times before calling Close. 32 func NewWriter(w io.Writer, opts ...WriterOption) *Writer { 33 w2 := Writer{ 34 blockSize: defaultBlockSize, 35 concurrency: runtime.GOMAXPROCS(0), 36 randSrc: rand.Reader, 37 level: levelFast, 38 } 39 for _, opt := range opts { 40 if err := opt(&w2); err != nil { 41 w2.errState = err 42 return &w2 43 } 44 } 45 w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize) 46 w2.paramsOK = true 47 w2.ibuf = make([]byte, 0, w2.blockSize) 48 w2.buffers.New = func() interface{} { 49 return make([]byte, w2.obufLen) 50 } 51 w2.Reset(w) 52 return &w2 53 } 54 55 // Writer is an io.Writer that can write Snappy-compressed bytes. 56 type Writer struct { 57 errMu sync.Mutex 58 errState error 59 60 // ibuf is a buffer for the incoming (uncompressed) bytes. 61 ibuf []byte 62 63 blockSize int 64 obufLen int 65 concurrency int 66 written int64 67 uncompWritten int64 // Bytes sent to compression 68 output chan chan result 69 buffers sync.Pool 70 pad int 71 72 writer io.Writer 73 randSrc io.Reader 74 writerWg sync.WaitGroup 75 index Index 76 customEnc func(dst, src []byte) int 77 78 // wroteStreamHeader is whether we have written the stream header. 79 wroteStreamHeader bool 80 paramsOK bool 81 snappy bool 82 flushOnWrite bool 83 appendIndex bool 84 level uint8 85 } 86 87 type result struct { 88 b []byte 89 // Uncompressed start offset 90 startOffset int64 91 } 92 93 // err returns the previously set error. 94 // If no error has been set it is set to err if not nil. 95 func (w *Writer) err(err error) error { 96 w.errMu.Lock() 97 errSet := w.errState 98 if errSet == nil && err != nil { 99 w.errState = err 100 errSet = err 101 } 102 w.errMu.Unlock() 103 return errSet 104 } 105 106 // Reset discards the writer's state and switches the Snappy writer to write to w. 107 // This permits reusing a Writer rather than allocating a new one. 108 func (w *Writer) Reset(writer io.Writer) { 109 if !w.paramsOK { 110 return 111 } 112 // Close previous writer, if any. 113 if w.output != nil { 114 close(w.output) 115 w.writerWg.Wait() 116 w.output = nil 117 } 118 w.errState = nil 119 w.ibuf = w.ibuf[:0] 120 w.wroteStreamHeader = false 121 w.written = 0 122 w.writer = writer 123 w.uncompWritten = 0 124 w.index.reset(w.blockSize) 125 126 // If we didn't get a writer, stop here. 127 if writer == nil { 128 return 129 } 130 // If no concurrency requested, don't spin up writer goroutine. 131 if w.concurrency == 1 { 132 return 133 } 134 135 toWrite := make(chan chan result, w.concurrency) 136 w.output = toWrite 137 w.writerWg.Add(1) 138 139 // Start a writer goroutine that will write all output in order. 140 go func() { 141 defer w.writerWg.Done() 142 143 // Get a queued write. 144 for write := range toWrite { 145 // Wait for the data to be available. 146 input := <-write 147 in := input.b 148 if len(in) > 0 { 149 if w.err(nil) == nil { 150 // Don't expose data from previous buffers. 151 toWrite := in[:len(in):len(in)] 152 // Write to output. 153 n, err := writer.Write(toWrite) 154 if err == nil && n != len(toWrite) { 155 err = io.ErrShortBuffer 156 } 157 _ = w.err(err) 158 w.err(w.index.add(w.written, input.startOffset)) 159 w.written += int64(n) 160 } 161 } 162 if cap(in) >= w.obufLen { 163 w.buffers.Put(in) 164 } 165 // close the incoming write request. 166 // This can be used for synchronizing flushes. 167 close(write) 168 } 169 }() 170 } 171 172 // Write satisfies the io.Writer interface. 173 func (w *Writer) Write(p []byte) (nRet int, errRet error) { 174 if err := w.err(nil); err != nil { 175 return 0, err 176 } 177 if w.flushOnWrite { 178 return w.write(p) 179 } 180 // If we exceed the input buffer size, start writing 181 for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil { 182 var n int 183 if len(w.ibuf) == 0 { 184 // Large write, empty buffer. 185 // Write directly from p to avoid copy. 186 n, _ = w.write(p) 187 } else { 188 n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) 189 w.ibuf = w.ibuf[:len(w.ibuf)+n] 190 w.write(w.ibuf) 191 w.ibuf = w.ibuf[:0] 192 } 193 nRet += n 194 p = p[n:] 195 } 196 if err := w.err(nil); err != nil { 197 return nRet, err 198 } 199 // p should always be able to fit into w.ibuf now. 200 n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) 201 w.ibuf = w.ibuf[:len(w.ibuf)+n] 202 nRet += n 203 return nRet, nil 204 } 205 206 // ReadFrom implements the io.ReaderFrom interface. 207 // Using this is typically more efficient since it avoids a memory copy. 208 // ReadFrom reads data from r until EOF or error. 209 // The return value n is the number of bytes read. 210 // Any error except io.EOF encountered during the read is also returned. 211 func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { 212 if err := w.err(nil); err != nil { 213 return 0, err 214 } 215 if len(w.ibuf) > 0 { 216 err := w.Flush() 217 if err != nil { 218 return 0, err 219 } 220 } 221 if br, ok := r.(byter); ok { 222 buf := br.Bytes() 223 if err := w.EncodeBuffer(buf); err != nil { 224 return 0, err 225 } 226 return int64(len(buf)), w.Flush() 227 } 228 for { 229 inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] 230 n2, err := io.ReadFull(r, inbuf[obufHeaderLen:]) 231 if err != nil { 232 if err == io.ErrUnexpectedEOF { 233 err = io.EOF 234 } 235 if err != io.EOF { 236 return n, w.err(err) 237 } 238 } 239 if n2 == 0 { 240 break 241 } 242 n += int64(n2) 243 err2 := w.writeFull(inbuf[:n2+obufHeaderLen]) 244 if w.err(err2) != nil { 245 break 246 } 247 248 if err != nil { 249 // We got EOF and wrote everything 250 break 251 } 252 } 253 254 return n, w.err(nil) 255 } 256 257 // AddSkippableBlock will add a skippable block to the stream. 258 // The ID must be 0x80-0xfe (inclusive). 259 // Length of the skippable block must be <= 16777215 bytes. 260 func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { 261 if err := w.err(nil); err != nil { 262 return err 263 } 264 if len(data) == 0 { 265 return nil 266 } 267 if id < 0x80 || id > chunkTypePadding { 268 return fmt.Errorf("invalid skippable block id %x", id) 269 } 270 if len(data) > maxChunkSize { 271 return fmt.Errorf("skippable block excessed maximum size") 272 } 273 var header [4]byte 274 chunkLen := 4 + len(data) 275 header[0] = id 276 header[1] = uint8(chunkLen >> 0) 277 header[2] = uint8(chunkLen >> 8) 278 header[3] = uint8(chunkLen >> 16) 279 if w.concurrency == 1 { 280 write := func(b []byte) error { 281 n, err := w.writer.Write(b) 282 if err = w.err(err); err != nil { 283 return err 284 } 285 if n != len(data) { 286 return w.err(io.ErrShortWrite) 287 } 288 w.written += int64(n) 289 return w.err(nil) 290 } 291 if !w.wroteStreamHeader { 292 w.wroteStreamHeader = true 293 if w.snappy { 294 if err := write([]byte(magicChunkSnappy)); err != nil { 295 return err 296 } 297 } else { 298 if err := write([]byte(magicChunk)); err != nil { 299 return err 300 } 301 } 302 } 303 if err := write(header[:]); err != nil { 304 return err 305 } 306 if err := write(data); err != nil { 307 return err 308 } 309 } 310 311 // Create output... 312 if !w.wroteStreamHeader { 313 w.wroteStreamHeader = true 314 hWriter := make(chan result) 315 w.output <- hWriter 316 if w.snappy { 317 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} 318 } else { 319 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} 320 } 321 } 322 323 // Copy input. 324 inbuf := w.buffers.Get().([]byte)[:4] 325 copy(inbuf, header[:]) 326 inbuf = append(inbuf, data...) 327 328 output := make(chan result, 1) 329 // Queue output. 330 w.output <- output 331 output <- result{startOffset: w.uncompWritten, b: inbuf} 332 333 return nil 334 } 335 336 // EncodeBuffer will add a buffer to the stream. 337 // This is the fastest way to encode a stream, 338 // but the input buffer cannot be written to by the caller 339 // until Flush or Close has been called when concurrency != 1. 340 // 341 // If you cannot control that, use the regular Write function. 342 // 343 // Note that input is not buffered. 344 // This means that each write will result in discrete blocks being created. 345 // For buffered writes, use the regular Write function. 346 func (w *Writer) EncodeBuffer(buf []byte) (err error) { 347 if err := w.err(nil); err != nil { 348 return err 349 } 350 351 if w.flushOnWrite { 352 _, err := w.write(buf) 353 return err 354 } 355 // Flush queued data first. 356 if len(w.ibuf) > 0 { 357 err := w.Flush() 358 if err != nil { 359 return err 360 } 361 } 362 if w.concurrency == 1 { 363 _, err := w.writeSync(buf) 364 return err 365 } 366 367 // Spawn goroutine and write block to output channel. 368 if !w.wroteStreamHeader { 369 w.wroteStreamHeader = true 370 hWriter := make(chan result) 371 w.output <- hWriter 372 if w.snappy { 373 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} 374 } else { 375 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} 376 } 377 } 378 379 for len(buf) > 0 { 380 // Cut input. 381 uncompressed := buf 382 if len(uncompressed) > w.blockSize { 383 uncompressed = uncompressed[:w.blockSize] 384 } 385 buf = buf[len(uncompressed):] 386 // Get an output buffer. 387 obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] 388 output := make(chan result) 389 // Queue output now, so we keep order. 390 w.output <- output 391 res := result{ 392 startOffset: w.uncompWritten, 393 } 394 w.uncompWritten += int64(len(uncompressed)) 395 go func() { 396 checksum := crc(uncompressed) 397 398 // Set to uncompressed. 399 chunkType := uint8(chunkTypeUncompressedData) 400 chunkLen := 4 + len(uncompressed) 401 402 // Attempt compressing. 403 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) 404 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) 405 406 // Check if we should use this, or store as uncompressed instead. 407 if n2 > 0 { 408 chunkType = uint8(chunkTypeCompressedData) 409 chunkLen = 4 + n + n2 410 obuf = obuf[:obufHeaderLen+n+n2] 411 } else { 412 // copy uncompressed 413 copy(obuf[obufHeaderLen:], uncompressed) 414 } 415 416 // Fill in the per-chunk header that comes before the body. 417 obuf[0] = chunkType 418 obuf[1] = uint8(chunkLen >> 0) 419 obuf[2] = uint8(chunkLen >> 8) 420 obuf[3] = uint8(chunkLen >> 16) 421 obuf[4] = uint8(checksum >> 0) 422 obuf[5] = uint8(checksum >> 8) 423 obuf[6] = uint8(checksum >> 16) 424 obuf[7] = uint8(checksum >> 24) 425 426 // Queue final output. 427 res.b = obuf 428 output <- res 429 }() 430 } 431 return nil 432 } 433 434 func (w *Writer) encodeBlock(obuf, uncompressed []byte) int { 435 if w.customEnc != nil { 436 if ret := w.customEnc(obuf, uncompressed); ret >= 0 { 437 return ret 438 } 439 } 440 if w.snappy { 441 switch w.level { 442 case levelFast: 443 return encodeBlockSnappy(obuf, uncompressed) 444 case levelBetter: 445 return encodeBlockBetterSnappy(obuf, uncompressed) 446 case levelBest: 447 return encodeBlockBestSnappy(obuf, uncompressed) 448 } 449 return 0 450 } 451 switch w.level { 452 case levelFast: 453 return encodeBlock(obuf, uncompressed) 454 case levelBetter: 455 return encodeBlockBetter(obuf, uncompressed) 456 case levelBest: 457 return encodeBlockBest(obuf, uncompressed, nil) 458 } 459 return 0 460 } 461 462 func (w *Writer) write(p []byte) (nRet int, errRet error) { 463 if err := w.err(nil); err != nil { 464 return 0, err 465 } 466 if w.concurrency == 1 { 467 return w.writeSync(p) 468 } 469 470 // Spawn goroutine and write block to output channel. 471 for len(p) > 0 { 472 if !w.wroteStreamHeader { 473 w.wroteStreamHeader = true 474 hWriter := make(chan result) 475 w.output <- hWriter 476 if w.snappy { 477 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} 478 } else { 479 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} 480 } 481 } 482 483 var uncompressed []byte 484 if len(p) > w.blockSize { 485 uncompressed, p = p[:w.blockSize], p[w.blockSize:] 486 } else { 487 uncompressed, p = p, nil 488 } 489 490 // Copy input. 491 // If the block is incompressible, this is used for the result. 492 inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] 493 obuf := w.buffers.Get().([]byte)[:w.obufLen] 494 copy(inbuf[obufHeaderLen:], uncompressed) 495 uncompressed = inbuf[obufHeaderLen:] 496 497 output := make(chan result) 498 // Queue output now, so we keep order. 499 w.output <- output 500 res := result{ 501 startOffset: w.uncompWritten, 502 } 503 w.uncompWritten += int64(len(uncompressed)) 504 505 go func() { 506 checksum := crc(uncompressed) 507 508 // Set to uncompressed. 509 chunkType := uint8(chunkTypeUncompressedData) 510 chunkLen := 4 + len(uncompressed) 511 512 // Attempt compressing. 513 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) 514 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) 515 516 // Check if we should use this, or store as uncompressed instead. 517 if n2 > 0 { 518 chunkType = uint8(chunkTypeCompressedData) 519 chunkLen = 4 + n + n2 520 obuf = obuf[:obufHeaderLen+n+n2] 521 } else { 522 // Use input as output. 523 obuf, inbuf = inbuf, obuf 524 } 525 526 // Fill in the per-chunk header that comes before the body. 527 obuf[0] = chunkType 528 obuf[1] = uint8(chunkLen >> 0) 529 obuf[2] = uint8(chunkLen >> 8) 530 obuf[3] = uint8(chunkLen >> 16) 531 obuf[4] = uint8(checksum >> 0) 532 obuf[5] = uint8(checksum >> 8) 533 obuf[6] = uint8(checksum >> 16) 534 obuf[7] = uint8(checksum >> 24) 535 536 // Queue final output. 537 res.b = obuf 538 output <- res 539 540 // Put unused buffer back in pool. 541 w.buffers.Put(inbuf) 542 }() 543 nRet += len(uncompressed) 544 } 545 return nRet, nil 546 } 547 548 // writeFull is a special version of write that will always write the full buffer. 549 // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer. 550 // The data will be written as a single block. 551 // The caller is not allowed to use inbuf after this function has been called. 552 func (w *Writer) writeFull(inbuf []byte) (errRet error) { 553 if err := w.err(nil); err != nil { 554 return err 555 } 556 557 if w.concurrency == 1 { 558 _, err := w.writeSync(inbuf[obufHeaderLen:]) 559 return err 560 } 561 562 // Spawn goroutine and write block to output channel. 563 if !w.wroteStreamHeader { 564 w.wroteStreamHeader = true 565 hWriter := make(chan result) 566 w.output <- hWriter 567 if w.snappy { 568 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} 569 } else { 570 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} 571 } 572 } 573 574 // Get an output buffer. 575 obuf := w.buffers.Get().([]byte)[:w.obufLen] 576 uncompressed := inbuf[obufHeaderLen:] 577 578 output := make(chan result) 579 // Queue output now, so we keep order. 580 w.output <- output 581 res := result{ 582 startOffset: w.uncompWritten, 583 } 584 w.uncompWritten += int64(len(uncompressed)) 585 586 go func() { 587 checksum := crc(uncompressed) 588 589 // Set to uncompressed. 590 chunkType := uint8(chunkTypeUncompressedData) 591 chunkLen := 4 + len(uncompressed) 592 593 // Attempt compressing. 594 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) 595 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) 596 597 // Check if we should use this, or store as uncompressed instead. 598 if n2 > 0 { 599 chunkType = uint8(chunkTypeCompressedData) 600 chunkLen = 4 + n + n2 601 obuf = obuf[:obufHeaderLen+n+n2] 602 } else { 603 // Use input as output. 604 obuf, inbuf = inbuf, obuf 605 } 606 607 // Fill in the per-chunk header that comes before the body. 608 obuf[0] = chunkType 609 obuf[1] = uint8(chunkLen >> 0) 610 obuf[2] = uint8(chunkLen >> 8) 611 obuf[3] = uint8(chunkLen >> 16) 612 obuf[4] = uint8(checksum >> 0) 613 obuf[5] = uint8(checksum >> 8) 614 obuf[6] = uint8(checksum >> 16) 615 obuf[7] = uint8(checksum >> 24) 616 617 // Queue final output. 618 res.b = obuf 619 output <- res 620 621 // Put unused buffer back in pool. 622 w.buffers.Put(inbuf) 623 }() 624 return nil 625 } 626 627 func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { 628 if err := w.err(nil); err != nil { 629 return 0, err 630 } 631 if !w.wroteStreamHeader { 632 w.wroteStreamHeader = true 633 var n int 634 var err error 635 if w.snappy { 636 n, err = w.writer.Write([]byte(magicChunkSnappy)) 637 } else { 638 n, err = w.writer.Write([]byte(magicChunk)) 639 } 640 if err != nil { 641 return 0, w.err(err) 642 } 643 if n != len(magicChunk) { 644 return 0, w.err(io.ErrShortWrite) 645 } 646 w.written += int64(n) 647 } 648 649 for len(p) > 0 { 650 var uncompressed []byte 651 if len(p) > w.blockSize { 652 uncompressed, p = p[:w.blockSize], p[w.blockSize:] 653 } else { 654 uncompressed, p = p, nil 655 } 656 657 obuf := w.buffers.Get().([]byte)[:w.obufLen] 658 checksum := crc(uncompressed) 659 660 // Set to uncompressed. 661 chunkType := uint8(chunkTypeUncompressedData) 662 chunkLen := 4 + len(uncompressed) 663 664 // Attempt compressing. 665 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) 666 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) 667 668 if n2 > 0 { 669 chunkType = uint8(chunkTypeCompressedData) 670 chunkLen = 4 + n + n2 671 obuf = obuf[:obufHeaderLen+n+n2] 672 } else { 673 obuf = obuf[:8] 674 } 675 676 // Fill in the per-chunk header that comes before the body. 677 obuf[0] = chunkType 678 obuf[1] = uint8(chunkLen >> 0) 679 obuf[2] = uint8(chunkLen >> 8) 680 obuf[3] = uint8(chunkLen >> 16) 681 obuf[4] = uint8(checksum >> 0) 682 obuf[5] = uint8(checksum >> 8) 683 obuf[6] = uint8(checksum >> 16) 684 obuf[7] = uint8(checksum >> 24) 685 686 n, err := w.writer.Write(obuf) 687 if err != nil { 688 return 0, w.err(err) 689 } 690 if n != len(obuf) { 691 return 0, w.err(io.ErrShortWrite) 692 } 693 w.err(w.index.add(w.written, w.uncompWritten)) 694 w.written += int64(n) 695 w.uncompWritten += int64(len(uncompressed)) 696 697 if chunkType == chunkTypeUncompressedData { 698 // Write uncompressed data. 699 n, err := w.writer.Write(uncompressed) 700 if err != nil { 701 return 0, w.err(err) 702 } 703 if n != len(uncompressed) { 704 return 0, w.err(io.ErrShortWrite) 705 } 706 w.written += int64(n) 707 } 708 w.buffers.Put(obuf) 709 // Queue final output. 710 nRet += len(uncompressed) 711 } 712 return nRet, nil 713 } 714 715 // Flush flushes the Writer to its underlying io.Writer. 716 // This does not apply padding. 717 func (w *Writer) Flush() error { 718 if err := w.err(nil); err != nil { 719 return err 720 } 721 722 // Queue any data still in input buffer. 723 if len(w.ibuf) != 0 { 724 if !w.wroteStreamHeader { 725 _, err := w.writeSync(w.ibuf) 726 w.ibuf = w.ibuf[:0] 727 return w.err(err) 728 } else { 729 _, err := w.write(w.ibuf) 730 w.ibuf = w.ibuf[:0] 731 err = w.err(err) 732 if err != nil { 733 return err 734 } 735 } 736 } 737 if w.output == nil { 738 return w.err(nil) 739 } 740 741 // Send empty buffer 742 res := make(chan result) 743 w.output <- res 744 // Block until this has been picked up. 745 res <- result{b: nil, startOffset: w.uncompWritten} 746 // When it is closed, we have flushed. 747 <-res 748 return w.err(nil) 749 } 750 751 // Close calls Flush and then closes the Writer. 752 // Calling Close multiple times is ok, 753 // but calling CloseIndex after this will make it not return the index. 754 func (w *Writer) Close() error { 755 _, err := w.closeIndex(w.appendIndex) 756 return err 757 } 758 759 // CloseIndex calls Close and returns an index on first call. 760 // This is not required if you are only adding index to a stream. 761 func (w *Writer) CloseIndex() ([]byte, error) { 762 return w.closeIndex(true) 763 } 764 765 func (w *Writer) closeIndex(idx bool) ([]byte, error) { 766 err := w.Flush() 767 if w.output != nil { 768 close(w.output) 769 w.writerWg.Wait() 770 w.output = nil 771 } 772 773 var index []byte 774 if w.err(nil) == nil && w.writer != nil { 775 // Create index. 776 if idx { 777 compSize := int64(-1) 778 if w.pad <= 1 { 779 compSize = w.written 780 } 781 index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) 782 // Count as written for padding. 783 if w.appendIndex { 784 w.written += int64(len(index)) 785 } 786 } 787 788 if w.pad > 1 { 789 tmp := w.ibuf[:0] 790 if len(index) > 0 { 791 // Allocate another buffer. 792 tmp = w.buffers.Get().([]byte)[:0] 793 defer w.buffers.Put(tmp) 794 } 795 add := calcSkippableFrame(w.written, int64(w.pad)) 796 frame, err := skippableFrame(tmp, add, w.randSrc) 797 if err = w.err(err); err != nil { 798 return nil, err 799 } 800 n, err2 := w.writer.Write(frame) 801 if err2 == nil && n != len(frame) { 802 err2 = io.ErrShortWrite 803 } 804 _ = w.err(err2) 805 } 806 if len(index) > 0 && w.appendIndex { 807 n, err2 := w.writer.Write(index) 808 if err2 == nil && n != len(index) { 809 err2 = io.ErrShortWrite 810 } 811 _ = w.err(err2) 812 } 813 } 814 err = w.err(errClosed) 815 if err == errClosed { 816 return index, nil 817 } 818 return nil, err 819 } 820 821 // calcSkippableFrame will return a total size to be added for written 822 // to be divisible by multiple. 823 // The value will always be > skippableFrameHeader. 824 // The function will panic if written < 0 or wantMultiple <= 0. 825 func calcSkippableFrame(written, wantMultiple int64) int { 826 if wantMultiple <= 0 { 827 panic("wantMultiple <= 0") 828 } 829 if written < 0 { 830 panic("written < 0") 831 } 832 leftOver := written % wantMultiple 833 if leftOver == 0 { 834 return 0 835 } 836 toAdd := wantMultiple - leftOver 837 for toAdd < skippableFrameHeader { 838 toAdd += wantMultiple 839 } 840 return int(toAdd) 841 } 842 843 // skippableFrame will add a skippable frame with a total size of bytes. 844 // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader 845 func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) { 846 if total == 0 { 847 return dst, nil 848 } 849 if total < skippableFrameHeader { 850 return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total) 851 } 852 if int64(total) >= maxBlockSize+skippableFrameHeader { 853 return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total) 854 } 855 // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)" 856 dst = append(dst, chunkTypePadding) 857 f := uint32(total - skippableFrameHeader) 858 // Add chunk length. 859 dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16)) 860 // Add data 861 start := len(dst) 862 dst = append(dst, make([]byte, f)...) 863 _, err := io.ReadFull(r, dst[start:]) 864 return dst, err 865 } 866 867 var errClosed = errors.New("s2: Writer is closed") 868 869 // WriterOption is an option for creating a encoder. 870 type WriterOption func(*Writer) error 871 872 // WriterConcurrency will set the concurrency, 873 // meaning the maximum number of decoders to run concurrently. 874 // The value supplied must be at least 1. 875 // By default this will be set to GOMAXPROCS. 876 func WriterConcurrency(n int) WriterOption { 877 return func(w *Writer) error { 878 if n <= 0 { 879 return errors.New("concurrency must be at least 1") 880 } 881 w.concurrency = n 882 return nil 883 } 884 } 885 886 // WriterAddIndex will append an index to the end of a stream 887 // when it is closed. 888 func WriterAddIndex() WriterOption { 889 return func(w *Writer) error { 890 w.appendIndex = true 891 return nil 892 } 893 } 894 895 // WriterBetterCompression will enable better compression. 896 // EncodeBetter compresses better than Encode but typically with a 897 // 10-40% speed decrease on both compression and decompression. 898 func WriterBetterCompression() WriterOption { 899 return func(w *Writer) error { 900 w.level = levelBetter 901 return nil 902 } 903 } 904 905 // WriterBestCompression will enable better compression. 906 // EncodeBetter compresses better than Encode but typically with a 907 // big speed decrease on compression. 908 func WriterBestCompression() WriterOption { 909 return func(w *Writer) error { 910 w.level = levelBest 911 return nil 912 } 913 } 914 915 // WriterUncompressed will bypass compression. 916 // The stream will be written as uncompressed blocks only. 917 // If concurrency is > 1 CRC and output will still be done async. 918 func WriterUncompressed() WriterOption { 919 return func(w *Writer) error { 920 w.level = levelUncompressed 921 return nil 922 } 923 } 924 925 // WriterBlockSize allows to override the default block size. 926 // Blocks will be this size or smaller. 927 // Minimum size is 4KB and and maximum size is 4MB. 928 // 929 // Bigger blocks may give bigger throughput on systems with many cores, 930 // and will increase compression slightly, but it will limit the possible 931 // concurrency for smaller payloads for both encoding and decoding. 932 // Default block size is 1MB. 933 // 934 // When writing Snappy compatible output using WriterSnappyCompat, 935 // the maximum block size is 64KB. 936 func WriterBlockSize(n int) WriterOption { 937 return func(w *Writer) error { 938 if w.snappy && n > maxSnappyBlockSize || n < minBlockSize { 939 return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output") 940 } 941 if n > maxBlockSize || n < minBlockSize { 942 return errors.New("s2: block size too large. Must be <= 4MB and >=4KB") 943 } 944 w.blockSize = n 945 return nil 946 } 947 } 948 949 // WriterPadding will add padding to all output so the size will be a multiple of n. 950 // This can be used to obfuscate the exact output size or make blocks of a certain size. 951 // The contents will be a skippable frame, so it will be invisible by the decoder. 952 // n must be > 0 and <= 4MB. 953 // The padded area will be filled with data from crypto/rand.Reader. 954 // The padding will be applied whenever Close is called on the writer. 955 func WriterPadding(n int) WriterOption { 956 return func(w *Writer) error { 957 if n <= 0 { 958 return fmt.Errorf("s2: padding must be at least 1") 959 } 960 // No need to waste our time. 961 if n == 1 { 962 w.pad = 0 963 } 964 if n > maxBlockSize { 965 return fmt.Errorf("s2: padding must less than 4MB") 966 } 967 w.pad = n 968 return nil 969 } 970 } 971 972 // WriterPaddingSrc will get random data for padding from the supplied source. 973 // By default crypto/rand is used. 974 func WriterPaddingSrc(reader io.Reader) WriterOption { 975 return func(w *Writer) error { 976 w.randSrc = reader 977 return nil 978 } 979 } 980 981 // WriterSnappyCompat will write snappy compatible output. 982 // The output can be decompressed using either snappy or s2. 983 // If block size is more than 64KB it is set to that. 984 func WriterSnappyCompat() WriterOption { 985 return func(w *Writer) error { 986 w.snappy = true 987 if w.blockSize > 64<<10 { 988 // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective. 989 // And allows us to skip some size checks. 990 w.blockSize = (64 << 10) - 8 991 } 992 return nil 993 } 994 } 995 996 // WriterFlushOnWrite will compress blocks on each call to the Write function. 997 // 998 // This is quite inefficient as blocks size will depend on the write size. 999 // 1000 // Use WriterConcurrency(1) to also make sure that output is flushed. 1001 // When Write calls return, otherwise they will be written when compression is done. 1002 func WriterFlushOnWrite() WriterOption { 1003 return func(w *Writer) error { 1004 w.flushOnWrite = true 1005 return nil 1006 } 1007 } 1008 1009 // WriterCustomEncoder allows to override the encoder for blocks on the stream. 1010 // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer. 1011 // Block size (initial varint) should not be added by the encoder. 1012 // Returning value 0 indicates the block could not be compressed. 1013 // Returning a negative value indicates that compression should be attempted. 1014 // The function should expect to be called concurrently. 1015 func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption { 1016 return func(w *Writer) error { 1017 w.customEnc = fn 1018 return nil 1019 } 1020 }