gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

writer.go (26597B)


      1 // Copyright 2011 The Snappy-Go Authors. All rights reserved.
      2 // Copyright (c) 2019+ Klaus Post. All rights reserved.
      3 // Use of this source code is governed by a BSD-style
      4 // license that can be found in the LICENSE file.
      5 
      6 package s2
      7 
      8 import (
      9 	"crypto/rand"
     10 	"encoding/binary"
     11 	"errors"
     12 	"fmt"
     13 	"io"
     14 	"runtime"
     15 	"sync"
     16 )
     17 
     18 const (
     19 	levelUncompressed = iota + 1
     20 	levelFast
     21 	levelBetter
     22 	levelBest
     23 )
     24 
     25 // NewWriter returns a new Writer that compresses to w, using the
     26 // framing format described at
     27 // https://github.com/google/snappy/blob/master/framing_format.txt
     28 //
     29 // Users must call Close to guarantee all data has been forwarded to
     30 // the underlying io.Writer and that resources are released.
     31 // They may also call Flush zero or more times before calling Close.
     32 func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
     33 	w2 := Writer{
     34 		blockSize:   defaultBlockSize,
     35 		concurrency: runtime.GOMAXPROCS(0),
     36 		randSrc:     rand.Reader,
     37 		level:       levelFast,
     38 	}
     39 	for _, opt := range opts {
     40 		if err := opt(&w2); err != nil {
     41 			w2.errState = err
     42 			return &w2
     43 		}
     44 	}
     45 	w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
     46 	w2.paramsOK = true
     47 	w2.ibuf = make([]byte, 0, w2.blockSize)
     48 	w2.buffers.New = func() interface{} {
     49 		return make([]byte, w2.obufLen)
     50 	}
     51 	w2.Reset(w)
     52 	return &w2
     53 }
     54 
     55 // Writer is an io.Writer that can write Snappy-compressed bytes.
     56 type Writer struct {
     57 	errMu    sync.Mutex
     58 	errState error
     59 
     60 	// ibuf is a buffer for the incoming (uncompressed) bytes.
     61 	ibuf []byte
     62 
     63 	blockSize     int
     64 	obufLen       int
     65 	concurrency   int
     66 	written       int64
     67 	uncompWritten int64 // Bytes sent to compression
     68 	output        chan chan result
     69 	buffers       sync.Pool
     70 	pad           int
     71 
     72 	writer    io.Writer
     73 	randSrc   io.Reader
     74 	writerWg  sync.WaitGroup
     75 	index     Index
     76 	customEnc func(dst, src []byte) int
     77 
     78 	// wroteStreamHeader is whether we have written the stream header.
     79 	wroteStreamHeader bool
     80 	paramsOK          bool
     81 	snappy            bool
     82 	flushOnWrite      bool
     83 	appendIndex       bool
     84 	level             uint8
     85 }
     86 
     87 type result struct {
     88 	b []byte
     89 	// Uncompressed start offset
     90 	startOffset int64
     91 }
     92 
     93 // err returns the previously set error.
     94 // If no error has been set it is set to err if not nil.
     95 func (w *Writer) err(err error) error {
     96 	w.errMu.Lock()
     97 	errSet := w.errState
     98 	if errSet == nil && err != nil {
     99 		w.errState = err
    100 		errSet = err
    101 	}
    102 	w.errMu.Unlock()
    103 	return errSet
    104 }
    105 
    106 // Reset discards the writer's state and switches the Snappy writer to write to w.
    107 // This permits reusing a Writer rather than allocating a new one.
    108 func (w *Writer) Reset(writer io.Writer) {
    109 	if !w.paramsOK {
    110 		return
    111 	}
    112 	// Close previous writer, if any.
    113 	if w.output != nil {
    114 		close(w.output)
    115 		w.writerWg.Wait()
    116 		w.output = nil
    117 	}
    118 	w.errState = nil
    119 	w.ibuf = w.ibuf[:0]
    120 	w.wroteStreamHeader = false
    121 	w.written = 0
    122 	w.writer = writer
    123 	w.uncompWritten = 0
    124 	w.index.reset(w.blockSize)
    125 
    126 	// If we didn't get a writer, stop here.
    127 	if writer == nil {
    128 		return
    129 	}
    130 	// If no concurrency requested, don't spin up writer goroutine.
    131 	if w.concurrency == 1 {
    132 		return
    133 	}
    134 
    135 	toWrite := make(chan chan result, w.concurrency)
    136 	w.output = toWrite
    137 	w.writerWg.Add(1)
    138 
    139 	// Start a writer goroutine that will write all output in order.
    140 	go func() {
    141 		defer w.writerWg.Done()
    142 
    143 		// Get a queued write.
    144 		for write := range toWrite {
    145 			// Wait for the data to be available.
    146 			input := <-write
    147 			in := input.b
    148 			if len(in) > 0 {
    149 				if w.err(nil) == nil {
    150 					// Don't expose data from previous buffers.
    151 					toWrite := in[:len(in):len(in)]
    152 					// Write to output.
    153 					n, err := writer.Write(toWrite)
    154 					if err == nil && n != len(toWrite) {
    155 						err = io.ErrShortBuffer
    156 					}
    157 					_ = w.err(err)
    158 					w.err(w.index.add(w.written, input.startOffset))
    159 					w.written += int64(n)
    160 				}
    161 			}
    162 			if cap(in) >= w.obufLen {
    163 				w.buffers.Put(in)
    164 			}
    165 			// close the incoming write request.
    166 			// This can be used for synchronizing flushes.
    167 			close(write)
    168 		}
    169 	}()
    170 }
    171 
    172 // Write satisfies the io.Writer interface.
    173 func (w *Writer) Write(p []byte) (nRet int, errRet error) {
    174 	if err := w.err(nil); err != nil {
    175 		return 0, err
    176 	}
    177 	if w.flushOnWrite {
    178 		return w.write(p)
    179 	}
    180 	// If we exceed the input buffer size, start writing
    181 	for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
    182 		var n int
    183 		if len(w.ibuf) == 0 {
    184 			// Large write, empty buffer.
    185 			// Write directly from p to avoid copy.
    186 			n, _ = w.write(p)
    187 		} else {
    188 			n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
    189 			w.ibuf = w.ibuf[:len(w.ibuf)+n]
    190 			w.write(w.ibuf)
    191 			w.ibuf = w.ibuf[:0]
    192 		}
    193 		nRet += n
    194 		p = p[n:]
    195 	}
    196 	if err := w.err(nil); err != nil {
    197 		return nRet, err
    198 	}
    199 	// p should always be able to fit into w.ibuf now.
    200 	n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
    201 	w.ibuf = w.ibuf[:len(w.ibuf)+n]
    202 	nRet += n
    203 	return nRet, nil
    204 }
    205 
    206 // ReadFrom implements the io.ReaderFrom interface.
    207 // Using this is typically more efficient since it avoids a memory copy.
    208 // ReadFrom reads data from r until EOF or error.
    209 // The return value n is the number of bytes read.
    210 // Any error except io.EOF encountered during the read is also returned.
    211 func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
    212 	if err := w.err(nil); err != nil {
    213 		return 0, err
    214 	}
    215 	if len(w.ibuf) > 0 {
    216 		err := w.Flush()
    217 		if err != nil {
    218 			return 0, err
    219 		}
    220 	}
    221 	if br, ok := r.(byter); ok {
    222 		buf := br.Bytes()
    223 		if err := w.EncodeBuffer(buf); err != nil {
    224 			return 0, err
    225 		}
    226 		return int64(len(buf)), w.Flush()
    227 	}
    228 	for {
    229 		inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
    230 		n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
    231 		if err != nil {
    232 			if err == io.ErrUnexpectedEOF {
    233 				err = io.EOF
    234 			}
    235 			if err != io.EOF {
    236 				return n, w.err(err)
    237 			}
    238 		}
    239 		if n2 == 0 {
    240 			break
    241 		}
    242 		n += int64(n2)
    243 		err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
    244 		if w.err(err2) != nil {
    245 			break
    246 		}
    247 
    248 		if err != nil {
    249 			// We got EOF and wrote everything
    250 			break
    251 		}
    252 	}
    253 
    254 	return n, w.err(nil)
    255 }
    256 
    257 // AddSkippableBlock will add a skippable block to the stream.
    258 // The ID must be 0x80-0xfe (inclusive).
    259 // Length of the skippable block must be <= 16777215 bytes.
    260 func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
    261 	if err := w.err(nil); err != nil {
    262 		return err
    263 	}
    264 	if len(data) == 0 {
    265 		return nil
    266 	}
    267 	if id < 0x80 || id > chunkTypePadding {
    268 		return fmt.Errorf("invalid skippable block id %x", id)
    269 	}
    270 	if len(data) > maxChunkSize {
    271 		return fmt.Errorf("skippable block excessed maximum size")
    272 	}
    273 	var header [4]byte
    274 	chunkLen := 4 + len(data)
    275 	header[0] = id
    276 	header[1] = uint8(chunkLen >> 0)
    277 	header[2] = uint8(chunkLen >> 8)
    278 	header[3] = uint8(chunkLen >> 16)
    279 	if w.concurrency == 1 {
    280 		write := func(b []byte) error {
    281 			n, err := w.writer.Write(b)
    282 			if err = w.err(err); err != nil {
    283 				return err
    284 			}
    285 			if n != len(data) {
    286 				return w.err(io.ErrShortWrite)
    287 			}
    288 			w.written += int64(n)
    289 			return w.err(nil)
    290 		}
    291 		if !w.wroteStreamHeader {
    292 			w.wroteStreamHeader = true
    293 			if w.snappy {
    294 				if err := write([]byte(magicChunkSnappy)); err != nil {
    295 					return err
    296 				}
    297 			} else {
    298 				if err := write([]byte(magicChunk)); err != nil {
    299 					return err
    300 				}
    301 			}
    302 		}
    303 		if err := write(header[:]); err != nil {
    304 			return err
    305 		}
    306 		if err := write(data); err != nil {
    307 			return err
    308 		}
    309 	}
    310 
    311 	// Create output...
    312 	if !w.wroteStreamHeader {
    313 		w.wroteStreamHeader = true
    314 		hWriter := make(chan result)
    315 		w.output <- hWriter
    316 		if w.snappy {
    317 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
    318 		} else {
    319 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
    320 		}
    321 	}
    322 
    323 	// Copy input.
    324 	inbuf := w.buffers.Get().([]byte)[:4]
    325 	copy(inbuf, header[:])
    326 	inbuf = append(inbuf, data...)
    327 
    328 	output := make(chan result, 1)
    329 	// Queue output.
    330 	w.output <- output
    331 	output <- result{startOffset: w.uncompWritten, b: inbuf}
    332 
    333 	return nil
    334 }
    335 
    336 // EncodeBuffer will add a buffer to the stream.
    337 // This is the fastest way to encode a stream,
    338 // but the input buffer cannot be written to by the caller
    339 // until Flush or Close has been called when concurrency != 1.
    340 //
    341 // If you cannot control that, use the regular Write function.
    342 //
    343 // Note that input is not buffered.
    344 // This means that each write will result in discrete blocks being created.
    345 // For buffered writes, use the regular Write function.
    346 func (w *Writer) EncodeBuffer(buf []byte) (err error) {
    347 	if err := w.err(nil); err != nil {
    348 		return err
    349 	}
    350 
    351 	if w.flushOnWrite {
    352 		_, err := w.write(buf)
    353 		return err
    354 	}
    355 	// Flush queued data first.
    356 	if len(w.ibuf) > 0 {
    357 		err := w.Flush()
    358 		if err != nil {
    359 			return err
    360 		}
    361 	}
    362 	if w.concurrency == 1 {
    363 		_, err := w.writeSync(buf)
    364 		return err
    365 	}
    366 
    367 	// Spawn goroutine and write block to output channel.
    368 	if !w.wroteStreamHeader {
    369 		w.wroteStreamHeader = true
    370 		hWriter := make(chan result)
    371 		w.output <- hWriter
    372 		if w.snappy {
    373 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
    374 		} else {
    375 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
    376 		}
    377 	}
    378 
    379 	for len(buf) > 0 {
    380 		// Cut input.
    381 		uncompressed := buf
    382 		if len(uncompressed) > w.blockSize {
    383 			uncompressed = uncompressed[:w.blockSize]
    384 		}
    385 		buf = buf[len(uncompressed):]
    386 		// Get an output buffer.
    387 		obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
    388 		output := make(chan result)
    389 		// Queue output now, so we keep order.
    390 		w.output <- output
    391 		res := result{
    392 			startOffset: w.uncompWritten,
    393 		}
    394 		w.uncompWritten += int64(len(uncompressed))
    395 		go func() {
    396 			checksum := crc(uncompressed)
    397 
    398 			// Set to uncompressed.
    399 			chunkType := uint8(chunkTypeUncompressedData)
    400 			chunkLen := 4 + len(uncompressed)
    401 
    402 			// Attempt compressing.
    403 			n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
    404 			n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
    405 
    406 			// Check if we should use this, or store as uncompressed instead.
    407 			if n2 > 0 {
    408 				chunkType = uint8(chunkTypeCompressedData)
    409 				chunkLen = 4 + n + n2
    410 				obuf = obuf[:obufHeaderLen+n+n2]
    411 			} else {
    412 				// copy uncompressed
    413 				copy(obuf[obufHeaderLen:], uncompressed)
    414 			}
    415 
    416 			// Fill in the per-chunk header that comes before the body.
    417 			obuf[0] = chunkType
    418 			obuf[1] = uint8(chunkLen >> 0)
    419 			obuf[2] = uint8(chunkLen >> 8)
    420 			obuf[3] = uint8(chunkLen >> 16)
    421 			obuf[4] = uint8(checksum >> 0)
    422 			obuf[5] = uint8(checksum >> 8)
    423 			obuf[6] = uint8(checksum >> 16)
    424 			obuf[7] = uint8(checksum >> 24)
    425 
    426 			// Queue final output.
    427 			res.b = obuf
    428 			output <- res
    429 		}()
    430 	}
    431 	return nil
    432 }
    433 
    434 func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
    435 	if w.customEnc != nil {
    436 		if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
    437 			return ret
    438 		}
    439 	}
    440 	if w.snappy {
    441 		switch w.level {
    442 		case levelFast:
    443 			return encodeBlockSnappy(obuf, uncompressed)
    444 		case levelBetter:
    445 			return encodeBlockBetterSnappy(obuf, uncompressed)
    446 		case levelBest:
    447 			return encodeBlockBestSnappy(obuf, uncompressed)
    448 		}
    449 		return 0
    450 	}
    451 	switch w.level {
    452 	case levelFast:
    453 		return encodeBlock(obuf, uncompressed)
    454 	case levelBetter:
    455 		return encodeBlockBetter(obuf, uncompressed)
    456 	case levelBest:
    457 		return encodeBlockBest(obuf, uncompressed, nil)
    458 	}
    459 	return 0
    460 }
    461 
    462 func (w *Writer) write(p []byte) (nRet int, errRet error) {
    463 	if err := w.err(nil); err != nil {
    464 		return 0, err
    465 	}
    466 	if w.concurrency == 1 {
    467 		return w.writeSync(p)
    468 	}
    469 
    470 	// Spawn goroutine and write block to output channel.
    471 	for len(p) > 0 {
    472 		if !w.wroteStreamHeader {
    473 			w.wroteStreamHeader = true
    474 			hWriter := make(chan result)
    475 			w.output <- hWriter
    476 			if w.snappy {
    477 				hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
    478 			} else {
    479 				hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
    480 			}
    481 		}
    482 
    483 		var uncompressed []byte
    484 		if len(p) > w.blockSize {
    485 			uncompressed, p = p[:w.blockSize], p[w.blockSize:]
    486 		} else {
    487 			uncompressed, p = p, nil
    488 		}
    489 
    490 		// Copy input.
    491 		// If the block is incompressible, this is used for the result.
    492 		inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
    493 		obuf := w.buffers.Get().([]byte)[:w.obufLen]
    494 		copy(inbuf[obufHeaderLen:], uncompressed)
    495 		uncompressed = inbuf[obufHeaderLen:]
    496 
    497 		output := make(chan result)
    498 		// Queue output now, so we keep order.
    499 		w.output <- output
    500 		res := result{
    501 			startOffset: w.uncompWritten,
    502 		}
    503 		w.uncompWritten += int64(len(uncompressed))
    504 
    505 		go func() {
    506 			checksum := crc(uncompressed)
    507 
    508 			// Set to uncompressed.
    509 			chunkType := uint8(chunkTypeUncompressedData)
    510 			chunkLen := 4 + len(uncompressed)
    511 
    512 			// Attempt compressing.
    513 			n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
    514 			n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
    515 
    516 			// Check if we should use this, or store as uncompressed instead.
    517 			if n2 > 0 {
    518 				chunkType = uint8(chunkTypeCompressedData)
    519 				chunkLen = 4 + n + n2
    520 				obuf = obuf[:obufHeaderLen+n+n2]
    521 			} else {
    522 				// Use input as output.
    523 				obuf, inbuf = inbuf, obuf
    524 			}
    525 
    526 			// Fill in the per-chunk header that comes before the body.
    527 			obuf[0] = chunkType
    528 			obuf[1] = uint8(chunkLen >> 0)
    529 			obuf[2] = uint8(chunkLen >> 8)
    530 			obuf[3] = uint8(chunkLen >> 16)
    531 			obuf[4] = uint8(checksum >> 0)
    532 			obuf[5] = uint8(checksum >> 8)
    533 			obuf[6] = uint8(checksum >> 16)
    534 			obuf[7] = uint8(checksum >> 24)
    535 
    536 			// Queue final output.
    537 			res.b = obuf
    538 			output <- res
    539 
    540 			// Put unused buffer back in pool.
    541 			w.buffers.Put(inbuf)
    542 		}()
    543 		nRet += len(uncompressed)
    544 	}
    545 	return nRet, nil
    546 }
    547 
    548 // writeFull is a special version of write that will always write the full buffer.
    549 // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
    550 // The data will be written as a single block.
    551 // The caller is not allowed to use inbuf after this function has been called.
    552 func (w *Writer) writeFull(inbuf []byte) (errRet error) {
    553 	if err := w.err(nil); err != nil {
    554 		return err
    555 	}
    556 
    557 	if w.concurrency == 1 {
    558 		_, err := w.writeSync(inbuf[obufHeaderLen:])
    559 		return err
    560 	}
    561 
    562 	// Spawn goroutine and write block to output channel.
    563 	if !w.wroteStreamHeader {
    564 		w.wroteStreamHeader = true
    565 		hWriter := make(chan result)
    566 		w.output <- hWriter
    567 		if w.snappy {
    568 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
    569 		} else {
    570 			hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
    571 		}
    572 	}
    573 
    574 	// Get an output buffer.
    575 	obuf := w.buffers.Get().([]byte)[:w.obufLen]
    576 	uncompressed := inbuf[obufHeaderLen:]
    577 
    578 	output := make(chan result)
    579 	// Queue output now, so we keep order.
    580 	w.output <- output
    581 	res := result{
    582 		startOffset: w.uncompWritten,
    583 	}
    584 	w.uncompWritten += int64(len(uncompressed))
    585 
    586 	go func() {
    587 		checksum := crc(uncompressed)
    588 
    589 		// Set to uncompressed.
    590 		chunkType := uint8(chunkTypeUncompressedData)
    591 		chunkLen := 4 + len(uncompressed)
    592 
    593 		// Attempt compressing.
    594 		n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
    595 		n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
    596 
    597 		// Check if we should use this, or store as uncompressed instead.
    598 		if n2 > 0 {
    599 			chunkType = uint8(chunkTypeCompressedData)
    600 			chunkLen = 4 + n + n2
    601 			obuf = obuf[:obufHeaderLen+n+n2]
    602 		} else {
    603 			// Use input as output.
    604 			obuf, inbuf = inbuf, obuf
    605 		}
    606 
    607 		// Fill in the per-chunk header that comes before the body.
    608 		obuf[0] = chunkType
    609 		obuf[1] = uint8(chunkLen >> 0)
    610 		obuf[2] = uint8(chunkLen >> 8)
    611 		obuf[3] = uint8(chunkLen >> 16)
    612 		obuf[4] = uint8(checksum >> 0)
    613 		obuf[5] = uint8(checksum >> 8)
    614 		obuf[6] = uint8(checksum >> 16)
    615 		obuf[7] = uint8(checksum >> 24)
    616 
    617 		// Queue final output.
    618 		res.b = obuf
    619 		output <- res
    620 
    621 		// Put unused buffer back in pool.
    622 		w.buffers.Put(inbuf)
    623 	}()
    624 	return nil
    625 }
    626 
    627 func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
    628 	if err := w.err(nil); err != nil {
    629 		return 0, err
    630 	}
    631 	if !w.wroteStreamHeader {
    632 		w.wroteStreamHeader = true
    633 		var n int
    634 		var err error
    635 		if w.snappy {
    636 			n, err = w.writer.Write([]byte(magicChunkSnappy))
    637 		} else {
    638 			n, err = w.writer.Write([]byte(magicChunk))
    639 		}
    640 		if err != nil {
    641 			return 0, w.err(err)
    642 		}
    643 		if n != len(magicChunk) {
    644 			return 0, w.err(io.ErrShortWrite)
    645 		}
    646 		w.written += int64(n)
    647 	}
    648 
    649 	for len(p) > 0 {
    650 		var uncompressed []byte
    651 		if len(p) > w.blockSize {
    652 			uncompressed, p = p[:w.blockSize], p[w.blockSize:]
    653 		} else {
    654 			uncompressed, p = p, nil
    655 		}
    656 
    657 		obuf := w.buffers.Get().([]byte)[:w.obufLen]
    658 		checksum := crc(uncompressed)
    659 
    660 		// Set to uncompressed.
    661 		chunkType := uint8(chunkTypeUncompressedData)
    662 		chunkLen := 4 + len(uncompressed)
    663 
    664 		// Attempt compressing.
    665 		n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
    666 		n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
    667 
    668 		if n2 > 0 {
    669 			chunkType = uint8(chunkTypeCompressedData)
    670 			chunkLen = 4 + n + n2
    671 			obuf = obuf[:obufHeaderLen+n+n2]
    672 		} else {
    673 			obuf = obuf[:8]
    674 		}
    675 
    676 		// Fill in the per-chunk header that comes before the body.
    677 		obuf[0] = chunkType
    678 		obuf[1] = uint8(chunkLen >> 0)
    679 		obuf[2] = uint8(chunkLen >> 8)
    680 		obuf[3] = uint8(chunkLen >> 16)
    681 		obuf[4] = uint8(checksum >> 0)
    682 		obuf[5] = uint8(checksum >> 8)
    683 		obuf[6] = uint8(checksum >> 16)
    684 		obuf[7] = uint8(checksum >> 24)
    685 
    686 		n, err := w.writer.Write(obuf)
    687 		if err != nil {
    688 			return 0, w.err(err)
    689 		}
    690 		if n != len(obuf) {
    691 			return 0, w.err(io.ErrShortWrite)
    692 		}
    693 		w.err(w.index.add(w.written, w.uncompWritten))
    694 		w.written += int64(n)
    695 		w.uncompWritten += int64(len(uncompressed))
    696 
    697 		if chunkType == chunkTypeUncompressedData {
    698 			// Write uncompressed data.
    699 			n, err := w.writer.Write(uncompressed)
    700 			if err != nil {
    701 				return 0, w.err(err)
    702 			}
    703 			if n != len(uncompressed) {
    704 				return 0, w.err(io.ErrShortWrite)
    705 			}
    706 			w.written += int64(n)
    707 		}
    708 		w.buffers.Put(obuf)
    709 		// Queue final output.
    710 		nRet += len(uncompressed)
    711 	}
    712 	return nRet, nil
    713 }
    714 
    715 // Flush flushes the Writer to its underlying io.Writer.
    716 // This does not apply padding.
    717 func (w *Writer) Flush() error {
    718 	if err := w.err(nil); err != nil {
    719 		return err
    720 	}
    721 
    722 	// Queue any data still in input buffer.
    723 	if len(w.ibuf) != 0 {
    724 		if !w.wroteStreamHeader {
    725 			_, err := w.writeSync(w.ibuf)
    726 			w.ibuf = w.ibuf[:0]
    727 			return w.err(err)
    728 		} else {
    729 			_, err := w.write(w.ibuf)
    730 			w.ibuf = w.ibuf[:0]
    731 			err = w.err(err)
    732 			if err != nil {
    733 				return err
    734 			}
    735 		}
    736 	}
    737 	if w.output == nil {
    738 		return w.err(nil)
    739 	}
    740 
    741 	// Send empty buffer
    742 	res := make(chan result)
    743 	w.output <- res
    744 	// Block until this has been picked up.
    745 	res <- result{b: nil, startOffset: w.uncompWritten}
    746 	// When it is closed, we have flushed.
    747 	<-res
    748 	return w.err(nil)
    749 }
    750 
    751 // Close calls Flush and then closes the Writer.
    752 // Calling Close multiple times is ok,
    753 // but calling CloseIndex after this will make it not return the index.
    754 func (w *Writer) Close() error {
    755 	_, err := w.closeIndex(w.appendIndex)
    756 	return err
    757 }
    758 
    759 // CloseIndex calls Close and returns an index on first call.
    760 // This is not required if you are only adding index to a stream.
    761 func (w *Writer) CloseIndex() ([]byte, error) {
    762 	return w.closeIndex(true)
    763 }
    764 
    765 func (w *Writer) closeIndex(idx bool) ([]byte, error) {
    766 	err := w.Flush()
    767 	if w.output != nil {
    768 		close(w.output)
    769 		w.writerWg.Wait()
    770 		w.output = nil
    771 	}
    772 
    773 	var index []byte
    774 	if w.err(nil) == nil && w.writer != nil {
    775 		// Create index.
    776 		if idx {
    777 			compSize := int64(-1)
    778 			if w.pad <= 1 {
    779 				compSize = w.written
    780 			}
    781 			index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
    782 			// Count as written for padding.
    783 			if w.appendIndex {
    784 				w.written += int64(len(index))
    785 			}
    786 		}
    787 
    788 		if w.pad > 1 {
    789 			tmp := w.ibuf[:0]
    790 			if len(index) > 0 {
    791 				// Allocate another buffer.
    792 				tmp = w.buffers.Get().([]byte)[:0]
    793 				defer w.buffers.Put(tmp)
    794 			}
    795 			add := calcSkippableFrame(w.written, int64(w.pad))
    796 			frame, err := skippableFrame(tmp, add, w.randSrc)
    797 			if err = w.err(err); err != nil {
    798 				return nil, err
    799 			}
    800 			n, err2 := w.writer.Write(frame)
    801 			if err2 == nil && n != len(frame) {
    802 				err2 = io.ErrShortWrite
    803 			}
    804 			_ = w.err(err2)
    805 		}
    806 		if len(index) > 0 && w.appendIndex {
    807 			n, err2 := w.writer.Write(index)
    808 			if err2 == nil && n != len(index) {
    809 				err2 = io.ErrShortWrite
    810 			}
    811 			_ = w.err(err2)
    812 		}
    813 	}
    814 	err = w.err(errClosed)
    815 	if err == errClosed {
    816 		return index, nil
    817 	}
    818 	return nil, err
    819 }
    820 
    821 // calcSkippableFrame will return a total size to be added for written
    822 // to be divisible by multiple.
    823 // The value will always be > skippableFrameHeader.
    824 // The function will panic if written < 0 or wantMultiple <= 0.
    825 func calcSkippableFrame(written, wantMultiple int64) int {
    826 	if wantMultiple <= 0 {
    827 		panic("wantMultiple <= 0")
    828 	}
    829 	if written < 0 {
    830 		panic("written < 0")
    831 	}
    832 	leftOver := written % wantMultiple
    833 	if leftOver == 0 {
    834 		return 0
    835 	}
    836 	toAdd := wantMultiple - leftOver
    837 	for toAdd < skippableFrameHeader {
    838 		toAdd += wantMultiple
    839 	}
    840 	return int(toAdd)
    841 }
    842 
    843 // skippableFrame will add a skippable frame with a total size of bytes.
    844 // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
    845 func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
    846 	if total == 0 {
    847 		return dst, nil
    848 	}
    849 	if total < skippableFrameHeader {
    850 		return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
    851 	}
    852 	if int64(total) >= maxBlockSize+skippableFrameHeader {
    853 		return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
    854 	}
    855 	// Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
    856 	dst = append(dst, chunkTypePadding)
    857 	f := uint32(total - skippableFrameHeader)
    858 	// Add chunk length.
    859 	dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
    860 	// Add data
    861 	start := len(dst)
    862 	dst = append(dst, make([]byte, f)...)
    863 	_, err := io.ReadFull(r, dst[start:])
    864 	return dst, err
    865 }
    866 
    867 var errClosed = errors.New("s2: Writer is closed")
    868 
    869 // WriterOption is an option for creating a encoder.
    870 type WriterOption func(*Writer) error
    871 
    872 // WriterConcurrency will set the concurrency,
    873 // meaning the maximum number of decoders to run concurrently.
    874 // The value supplied must be at least 1.
    875 // By default this will be set to GOMAXPROCS.
    876 func WriterConcurrency(n int) WriterOption {
    877 	return func(w *Writer) error {
    878 		if n <= 0 {
    879 			return errors.New("concurrency must be at least 1")
    880 		}
    881 		w.concurrency = n
    882 		return nil
    883 	}
    884 }
    885 
    886 // WriterAddIndex will append an index to the end of a stream
    887 // when it is closed.
    888 func WriterAddIndex() WriterOption {
    889 	return func(w *Writer) error {
    890 		w.appendIndex = true
    891 		return nil
    892 	}
    893 }
    894 
    895 // WriterBetterCompression will enable better compression.
    896 // EncodeBetter compresses better than Encode but typically with a
    897 // 10-40% speed decrease on both compression and decompression.
    898 func WriterBetterCompression() WriterOption {
    899 	return func(w *Writer) error {
    900 		w.level = levelBetter
    901 		return nil
    902 	}
    903 }
    904 
    905 // WriterBestCompression will enable better compression.
    906 // EncodeBetter compresses better than Encode but typically with a
    907 // big speed decrease on compression.
    908 func WriterBestCompression() WriterOption {
    909 	return func(w *Writer) error {
    910 		w.level = levelBest
    911 		return nil
    912 	}
    913 }
    914 
    915 // WriterUncompressed will bypass compression.
    916 // The stream will be written as uncompressed blocks only.
    917 // If concurrency is > 1 CRC and output will still be done async.
    918 func WriterUncompressed() WriterOption {
    919 	return func(w *Writer) error {
    920 		w.level = levelUncompressed
    921 		return nil
    922 	}
    923 }
    924 
    925 // WriterBlockSize allows to override the default block size.
    926 // Blocks will be this size or smaller.
    927 // Minimum size is 4KB and and maximum size is 4MB.
    928 //
    929 // Bigger blocks may give bigger throughput on systems with many cores,
    930 // and will increase compression slightly, but it will limit the possible
    931 // concurrency for smaller payloads for both encoding and decoding.
    932 // Default block size is 1MB.
    933 //
    934 // When writing Snappy compatible output using WriterSnappyCompat,
    935 // the maximum block size is 64KB.
    936 func WriterBlockSize(n int) WriterOption {
    937 	return func(w *Writer) error {
    938 		if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
    939 			return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
    940 		}
    941 		if n > maxBlockSize || n < minBlockSize {
    942 			return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
    943 		}
    944 		w.blockSize = n
    945 		return nil
    946 	}
    947 }
    948 
    949 // WriterPadding will add padding to all output so the size will be a multiple of n.
    950 // This can be used to obfuscate the exact output size or make blocks of a certain size.
    951 // The contents will be a skippable frame, so it will be invisible by the decoder.
    952 // n must be > 0 and <= 4MB.
    953 // The padded area will be filled with data from crypto/rand.Reader.
    954 // The padding will be applied whenever Close is called on the writer.
    955 func WriterPadding(n int) WriterOption {
    956 	return func(w *Writer) error {
    957 		if n <= 0 {
    958 			return fmt.Errorf("s2: padding must be at least 1")
    959 		}
    960 		// No need to waste our time.
    961 		if n == 1 {
    962 			w.pad = 0
    963 		}
    964 		if n > maxBlockSize {
    965 			return fmt.Errorf("s2: padding must less than 4MB")
    966 		}
    967 		w.pad = n
    968 		return nil
    969 	}
    970 }
    971 
    972 // WriterPaddingSrc will get random data for padding from the supplied source.
    973 // By default crypto/rand is used.
    974 func WriterPaddingSrc(reader io.Reader) WriterOption {
    975 	return func(w *Writer) error {
    976 		w.randSrc = reader
    977 		return nil
    978 	}
    979 }
    980 
    981 // WriterSnappyCompat will write snappy compatible output.
    982 // The output can be decompressed using either snappy or s2.
    983 // If block size is more than 64KB it is set to that.
    984 func WriterSnappyCompat() WriterOption {
    985 	return func(w *Writer) error {
    986 		w.snappy = true
    987 		if w.blockSize > 64<<10 {
    988 			// We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
    989 			// And allows us to skip some size checks.
    990 			w.blockSize = (64 << 10) - 8
    991 		}
    992 		return nil
    993 	}
    994 }
    995 
    996 // WriterFlushOnWrite will compress blocks on each call to the Write function.
    997 //
    998 // This is quite inefficient as blocks size will depend on the write size.
    999 //
   1000 // Use WriterConcurrency(1) to also make sure that output is flushed.
   1001 // When Write calls return, otherwise they will be written when compression is done.
   1002 func WriterFlushOnWrite() WriterOption {
   1003 	return func(w *Writer) error {
   1004 		w.flushOnWrite = true
   1005 		return nil
   1006 	}
   1007 }
   1008 
   1009 // WriterCustomEncoder allows to override the encoder for blocks on the stream.
   1010 // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
   1011 // Block size (initial varint) should not be added by the encoder.
   1012 // Returning value 0 indicates the block could not be compressed.
   1013 // Returning a negative value indicates that compression should be attempted.
   1014 // The function should expect to be called concurrently.
   1015 func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
   1016 	return func(w *Writer) error {
   1017 		w.customEnc = fn
   1018 		return nil
   1019 	}
   1020 }