gzip.go (3311B)
1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package gzip implements and registers the gzip compressor 20 // during the initialization. 21 // 22 // # Experimental 23 // 24 // Notice: This package is EXPERIMENTAL and may be changed or removed in a 25 // later release. 26 package gzip 27 28 import ( 29 "compress/gzip" 30 "encoding/binary" 31 "fmt" 32 "io" 33 "sync" 34 35 "google.golang.org/grpc/encoding" 36 ) 37 38 // Name is the name registered for the gzip compressor. 39 const Name = "gzip" 40 41 func init() { 42 c := &compressor{} 43 c.poolCompressor.New = func() interface{} { 44 return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor} 45 } 46 encoding.RegisterCompressor(c) 47 } 48 49 type writer struct { 50 *gzip.Writer 51 pool *sync.Pool 52 } 53 54 // SetLevel updates the registered gzip compressor to use the compression level specified (gzip.HuffmanOnly is not supported). 55 // NOTE: this function must only be called during initialization time (i.e. in an init() function), 56 // and is not thread-safe. 57 // 58 // The error returned will be nil if the specified level is valid. 59 func SetLevel(level int) error { 60 if level < gzip.DefaultCompression || level > gzip.BestCompression { 61 return fmt.Errorf("grpc: invalid gzip compression level: %d", level) 62 } 63 c := encoding.GetCompressor(Name).(*compressor) 64 c.poolCompressor.New = func() interface{} { 65 w, err := gzip.NewWriterLevel(io.Discard, level) 66 if err != nil { 67 panic(err) 68 } 69 return &writer{Writer: w, pool: &c.poolCompressor} 70 } 71 return nil 72 } 73 74 func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { 75 z := c.poolCompressor.Get().(*writer) 76 z.Writer.Reset(w) 77 return z, nil 78 } 79 80 func (z *writer) Close() error { 81 defer z.pool.Put(z) 82 return z.Writer.Close() 83 } 84 85 type reader struct { 86 *gzip.Reader 87 pool *sync.Pool 88 } 89 90 func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { 91 z, inPool := c.poolDecompressor.Get().(*reader) 92 if !inPool { 93 newZ, err := gzip.NewReader(r) 94 if err != nil { 95 return nil, err 96 } 97 return &reader{Reader: newZ, pool: &c.poolDecompressor}, nil 98 } 99 if err := z.Reset(r); err != nil { 100 c.poolDecompressor.Put(z) 101 return nil, err 102 } 103 return z, nil 104 } 105 106 func (z *reader) Read(p []byte) (n int, err error) { 107 n, err = z.Reader.Read(p) 108 if err == io.EOF { 109 z.pool.Put(z) 110 } 111 return n, err 112 } 113 114 // RFC1952 specifies that the last four bytes "contains the size of 115 // the original (uncompressed) input data modulo 2^32." 116 // gRPC has a max message size of 2GB so we don't need to worry about wraparound. 117 func (c *compressor) DecompressedSize(buf []byte) int { 118 last := len(buf) 119 if last < 4 { 120 return -1 121 } 122 return int(binary.LittleEndian.Uint32(buf[last-4 : last])) 123 } 124 125 func (c *compressor) Name() string { 126 return Name 127 } 128 129 type compressor struct { 130 poolCompressor sync.Pool 131 poolDecompressor sync.Pool 132 }