api-putobject-snowball.go (5360B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2021 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "archive/tar" 22 "bufio" 23 "bytes" 24 "context" 25 "fmt" 26 "io" 27 "os" 28 "strings" 29 "sync" 30 "time" 31 32 "github.com/klauspost/compress/s2" 33 ) 34 35 // SnowballOptions contains options for PutObjectsSnowball calls. 36 type SnowballOptions struct { 37 // Opts is options applied to all objects. 38 Opts PutObjectOptions 39 40 // Processing options: 41 42 // InMemory specifies that all objects should be collected in memory 43 // before they are uploaded. 44 // If false a temporary file will be created. 45 InMemory bool 46 47 // Compress enabled content compression before upload. 48 // Compression will typically reduce memory and network usage, 49 // Compression can safely be enabled with MinIO hosts. 50 Compress bool 51 } 52 53 // SnowballObject contains information about a single object to be added to the snowball. 54 type SnowballObject struct { 55 // Key is the destination key, including prefix. 56 Key string 57 58 // Size is the content size of this object. 59 Size int64 60 61 // Modtime to apply to the object. 62 // If Modtime is the zero value current time will be used. 63 ModTime time.Time 64 65 // Content of the object. 66 // Exactly 'Size' number of bytes must be provided. 67 Content io.Reader 68 69 // Close will be called when an object has finished processing. 70 // Note that if PutObjectsSnowball returns because of an error, 71 // objects not consumed from the input will NOT have been closed. 72 // Leave as nil for no callback. 73 Close func() 74 } 75 76 type nopReadSeekCloser struct { 77 io.ReadSeeker 78 } 79 80 func (n nopReadSeekCloser) Close() error { 81 return nil 82 } 83 84 // This is available as io.ReadSeekCloser from go1.16 85 type readSeekCloser interface { 86 io.Reader 87 io.Closer 88 io.Seeker 89 } 90 91 // PutObjectsSnowball will put multiple objects with a single put call. 92 // A (compressed) TAR file will be created which will contain multiple objects. 93 // The key for each object will be used for the destination in the specified bucket. 94 // Total size should be < 5TB. 95 // This function blocks until 'objs' is closed and the content has been uploaded. 96 func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) { 97 err = opts.Opts.validate() 98 if err != nil { 99 return err 100 } 101 var tmpWriter io.Writer 102 var getTmpReader func() (rc readSeekCloser, sz int64, err error) 103 if opts.InMemory { 104 b := bytes.NewBuffer(nil) 105 tmpWriter = b 106 getTmpReader = func() (readSeekCloser, int64, error) { 107 return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil 108 } 109 } else { 110 f, err := os.CreateTemp("", "s3-putsnowballobjects-*") 111 if err != nil { 112 return err 113 } 114 name := f.Name() 115 tmpWriter = f 116 var once sync.Once 117 defer once.Do(func() { 118 f.Close() 119 }) 120 defer os.Remove(name) 121 getTmpReader = func() (readSeekCloser, int64, error) { 122 once.Do(func() { 123 f.Close() 124 }) 125 f, err := os.Open(name) 126 if err != nil { 127 return nil, 0, err 128 } 129 st, err := f.Stat() 130 if err != nil { 131 return nil, 0, err 132 } 133 return f, st.Size(), nil 134 } 135 } 136 flush := func() error { return nil } 137 if !opts.Compress { 138 if !opts.InMemory { 139 // Insert buffer for writes. 140 buf := bufio.NewWriterSize(tmpWriter, 1<<20) 141 flush = buf.Flush 142 tmpWriter = buf 143 } 144 } else { 145 s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression()) 146 flush = s2c.Close 147 defer s2c.Close() 148 tmpWriter = s2c 149 } 150 t := tar.NewWriter(tmpWriter) 151 152 objectLoop: 153 for { 154 select { 155 case <-ctx.Done(): 156 return ctx.Err() 157 case obj, ok := <-objs: 158 if !ok { 159 break objectLoop 160 } 161 162 closeObj := func() {} 163 if obj.Close != nil { 164 closeObj = obj.Close 165 } 166 167 // Trim accidental slash prefix. 168 obj.Key = strings.TrimPrefix(obj.Key, "/") 169 header := tar.Header{ 170 Typeflag: tar.TypeReg, 171 Name: obj.Key, 172 Size: obj.Size, 173 ModTime: obj.ModTime, 174 Format: tar.FormatPAX, 175 } 176 if header.ModTime.IsZero() { 177 header.ModTime = time.Now().UTC() 178 } 179 180 if err := t.WriteHeader(&header); err != nil { 181 closeObj() 182 return err 183 } 184 n, err := io.Copy(t, obj.Content) 185 if err != nil { 186 closeObj() 187 return err 188 } 189 if n != obj.Size { 190 closeObj() 191 return io.ErrUnexpectedEOF 192 } 193 closeObj() 194 } 195 } 196 // Flush tar 197 err = t.Flush() 198 if err != nil { 199 return err 200 } 201 // Flush compression 202 err = flush() 203 if err != nil { 204 return err 205 } 206 if opts.Opts.UserMetadata == nil { 207 opts.Opts.UserMetadata = map[string]string{} 208 } 209 opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true" 210 opts.Opts.DisableMultipart = true 211 rc, sz, err := getTmpReader() 212 if err != nil { 213 return err 214 } 215 defer rc.Close() 216 rand := c.random.Uint64() 217 _, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts) 218 return err 219 }