api-put-object.go (14861B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2015-2017 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "errors" 25 "fmt" 26 "hash/crc32" 27 "io" 28 "net/http" 29 "sort" 30 "time" 31 32 "github.com/minio/minio-go/v7/pkg/encrypt" 33 "github.com/minio/minio-go/v7/pkg/s3utils" 34 "golang.org/x/net/http/httpguts" 35 ) 36 37 // ReplicationStatus represents replication status of object 38 type ReplicationStatus string 39 40 const ( 41 // ReplicationStatusPending indicates replication is pending 42 ReplicationStatusPending ReplicationStatus = "PENDING" 43 // ReplicationStatusComplete indicates replication completed ok 44 ReplicationStatusComplete ReplicationStatus = "COMPLETED" 45 // ReplicationStatusFailed indicates replication failed 46 ReplicationStatusFailed ReplicationStatus = "FAILED" 47 // ReplicationStatusReplica indicates object is a replica of a source 48 ReplicationStatusReplica ReplicationStatus = "REPLICA" 49 ) 50 51 // Empty returns true if no replication status set. 52 func (r ReplicationStatus) Empty() bool { 53 return r == "" 54 } 55 56 // AdvancedPutOptions for internal use - to be utilized by replication, ILM transition 57 // implementation on MinIO server 58 type AdvancedPutOptions struct { 59 SourceVersionID string 60 SourceETag string 61 ReplicationStatus ReplicationStatus 62 SourceMTime time.Time 63 ReplicationRequest bool 64 RetentionTimestamp time.Time 65 TaggingTimestamp time.Time 66 LegalholdTimestamp time.Time 67 } 68 69 // PutObjectOptions represents options specified by user for PutObject call 70 type PutObjectOptions struct { 71 UserMetadata map[string]string 72 UserTags map[string]string 73 Progress io.Reader 74 ContentType string 75 ContentEncoding string 76 ContentDisposition string 77 ContentLanguage string 78 CacheControl string 79 Mode RetentionMode 80 RetainUntilDate time.Time 81 ServerSideEncryption encrypt.ServerSide 82 NumThreads uint 83 StorageClass string 84 WebsiteRedirectLocation string 85 PartSize uint64 86 LegalHold LegalHoldStatus 87 SendContentMd5 bool 88 DisableContentSha256 bool 89 DisableMultipart bool 90 91 // ConcurrentStreamParts will create NumThreads buffers of PartSize bytes, 92 // fill them serially and upload them in parallel. 93 // This can be used for faster uploads on non-seekable or slow-to-seek input. 94 ConcurrentStreamParts bool 95 Internal AdvancedPutOptions 96 97 customHeaders http.Header 98 } 99 100 // SetMatchETag if etag matches while PUT MinIO returns an error 101 // this is a MinIO specific extension to support optimistic locking 102 // semantics. 103 func (opts *PutObjectOptions) SetMatchETag(etag string) { 104 if opts.customHeaders == nil { 105 opts.customHeaders = http.Header{} 106 } 107 opts.customHeaders.Set("If-Match", "\""+etag+"\"") 108 } 109 110 // SetMatchETagExcept if etag does not match while PUT MinIO returns an 111 // error this is a MinIO specific extension to support optimistic locking 112 // semantics. 113 func (opts *PutObjectOptions) SetMatchETagExcept(etag string) { 114 if opts.customHeaders == nil { 115 opts.customHeaders = http.Header{} 116 } 117 opts.customHeaders.Set("If-None-Match", "\""+etag+"\"") 118 } 119 120 // getNumThreads - gets the number of threads to be used in the multipart 121 // put object operation 122 func (opts PutObjectOptions) getNumThreads() (numThreads int) { 123 if opts.NumThreads > 0 { 124 numThreads = int(opts.NumThreads) 125 } else { 126 numThreads = totalWorkers 127 } 128 return 129 } 130 131 // Header - constructs the headers from metadata entered by user in 132 // PutObjectOptions struct 133 func (opts PutObjectOptions) Header() (header http.Header) { 134 header = make(http.Header) 135 136 contentType := opts.ContentType 137 if contentType == "" { 138 contentType = "application/octet-stream" 139 } 140 header.Set("Content-Type", contentType) 141 142 if opts.ContentEncoding != "" { 143 header.Set("Content-Encoding", opts.ContentEncoding) 144 } 145 if opts.ContentDisposition != "" { 146 header.Set("Content-Disposition", opts.ContentDisposition) 147 } 148 if opts.ContentLanguage != "" { 149 header.Set("Content-Language", opts.ContentLanguage) 150 } 151 if opts.CacheControl != "" { 152 header.Set("Cache-Control", opts.CacheControl) 153 } 154 155 if opts.Mode != "" { 156 header.Set(amzLockMode, opts.Mode.String()) 157 } 158 159 if !opts.RetainUntilDate.IsZero() { 160 header.Set("X-Amz-Object-Lock-Retain-Until-Date", opts.RetainUntilDate.Format(time.RFC3339)) 161 } 162 163 if opts.LegalHold != "" { 164 header.Set(amzLegalHoldHeader, opts.LegalHold.String()) 165 } 166 167 if opts.ServerSideEncryption != nil { 168 opts.ServerSideEncryption.Marshal(header) 169 } 170 171 if opts.StorageClass != "" { 172 header.Set(amzStorageClass, opts.StorageClass) 173 } 174 175 if opts.WebsiteRedirectLocation != "" { 176 header.Set(amzWebsiteRedirectLocation, opts.WebsiteRedirectLocation) 177 } 178 179 if !opts.Internal.ReplicationStatus.Empty() { 180 header.Set(amzBucketReplicationStatus, string(opts.Internal.ReplicationStatus)) 181 } 182 if !opts.Internal.SourceMTime.IsZero() { 183 header.Set(minIOBucketSourceMTime, opts.Internal.SourceMTime.Format(time.RFC3339Nano)) 184 } 185 if opts.Internal.SourceETag != "" { 186 header.Set(minIOBucketSourceETag, opts.Internal.SourceETag) 187 } 188 if opts.Internal.ReplicationRequest { 189 header.Set(minIOBucketReplicationRequest, "true") 190 } 191 if !opts.Internal.LegalholdTimestamp.IsZero() { 192 header.Set(minIOBucketReplicationObjectLegalHoldTimestamp, opts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano)) 193 } 194 if !opts.Internal.RetentionTimestamp.IsZero() { 195 header.Set(minIOBucketReplicationObjectRetentionTimestamp, opts.Internal.RetentionTimestamp.Format(time.RFC3339Nano)) 196 } 197 if !opts.Internal.TaggingTimestamp.IsZero() { 198 header.Set(minIOBucketReplicationTaggingTimestamp, opts.Internal.TaggingTimestamp.Format(time.RFC3339Nano)) 199 } 200 201 if len(opts.UserTags) != 0 { 202 header.Set(amzTaggingHeader, s3utils.TagEncode(opts.UserTags)) 203 } 204 205 for k, v := range opts.UserMetadata { 206 if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) { 207 header.Set(k, v) 208 } else { 209 header.Set("x-amz-meta-"+k, v) 210 } 211 } 212 213 // set any other additional custom headers. 214 for k, v := range opts.customHeaders { 215 header[k] = v 216 } 217 218 return 219 } 220 221 // validate() checks if the UserMetadata map has standard headers or and raises an error if so. 222 func (opts PutObjectOptions) validate() (err error) { 223 for k, v := range opts.UserMetadata { 224 if !httpguts.ValidHeaderFieldName(k) || isStandardHeader(k) || isSSEHeader(k) || isStorageClassHeader(k) { 225 return errInvalidArgument(k + " unsupported user defined metadata name") 226 } 227 if !httpguts.ValidHeaderFieldValue(v) { 228 return errInvalidArgument(v + " unsupported user defined metadata value") 229 } 230 } 231 if opts.Mode != "" && !opts.Mode.IsValid() { 232 return errInvalidArgument(opts.Mode.String() + " unsupported retention mode") 233 } 234 if opts.LegalHold != "" && !opts.LegalHold.IsValid() { 235 return errInvalidArgument(opts.LegalHold.String() + " unsupported legal-hold status") 236 } 237 return nil 238 } 239 240 // completedParts is a collection of parts sortable by their part numbers. 241 // used for sorting the uploaded parts before completing the multipart request. 242 type completedParts []CompletePart 243 244 func (a completedParts) Len() int { return len(a) } 245 func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 246 func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } 247 248 // PutObject creates an object in a bucket. 249 // 250 // You must have WRITE permissions on a bucket to create an object. 251 // 252 // - For size smaller than 16MiB PutObject automatically does a 253 // single atomic PUT operation. 254 // 255 // - For size larger than 16MiB PutObject automatically does a 256 // multipart upload operation. 257 // 258 // - For size input as -1 PutObject does a multipart Put operation 259 // until input stream reaches EOF. Maximum object size that can 260 // be uploaded through this operation will be 5TiB. 261 // 262 // WARNING: Passing down '-1' will use memory and these cannot 263 // be reused for best outcomes for PutObject(), pass the size always. 264 // 265 // NOTE: Upon errors during upload multipart operation is entirely aborted. 266 func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, 267 opts PutObjectOptions, 268 ) (info UploadInfo, err error) { 269 if objectSize < 0 && opts.DisableMultipart { 270 return UploadInfo{}, errors.New("object size must be provided with disable multipart upload") 271 } 272 273 err = opts.validate() 274 if err != nil { 275 return UploadInfo{}, err 276 } 277 278 return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts) 279 } 280 281 func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 282 // Check for largest object size allowed. 283 if size > int64(maxMultipartPutObjectSize) { 284 return UploadInfo{}, errEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName) 285 } 286 287 // NOTE: Streaming signature is not supported by GCS. 288 if s3utils.IsGoogleEndpoint(*c.endpointURL) { 289 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 290 } 291 292 partSize := opts.PartSize 293 if opts.PartSize == 0 { 294 partSize = minPartSize 295 } 296 297 if c.overrideSignerType.IsV2() { 298 if size >= 0 && size < int64(partSize) || opts.DisableMultipart { 299 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 300 } 301 return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts) 302 } 303 304 if size < 0 { 305 if opts.DisableMultipart { 306 return UploadInfo{}, errors.New("no length provided and multipart disabled") 307 } 308 if opts.ConcurrentStreamParts && opts.NumThreads > 1 { 309 return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts) 310 } 311 return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts) 312 } 313 314 if size < int64(partSize) || opts.DisableMultipart { 315 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 316 } 317 318 return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts) 319 } 320 321 func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) { 322 // Input validation. 323 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 324 return UploadInfo{}, err 325 } 326 if err = s3utils.CheckValidObjectName(objectName); err != nil { 327 return UploadInfo{}, err 328 } 329 330 // Total data read and written to server. should be equal to 331 // 'size' at the end of the call. 332 var totalUploadedSize int64 333 334 // Complete multipart upload. 335 var complMultipartUpload completeMultipartUpload 336 337 // Calculate the optimal parts info for a given size. 338 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) 339 if err != nil { 340 return UploadInfo{}, err 341 } 342 343 if !opts.SendContentMd5 { 344 if opts.UserMetadata == nil { 345 opts.UserMetadata = make(map[string]string, 1) 346 } 347 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" 348 } 349 350 // Initiate a new multipart upload. 351 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 352 if err != nil { 353 return UploadInfo{}, err 354 } 355 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") 356 357 defer func() { 358 if err != nil { 359 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 360 } 361 }() 362 363 // Part number always starts with '1'. 364 partNumber := 1 365 366 // Initialize parts uploaded map. 367 partsInfo := make(map[int]ObjectPart) 368 369 // Create a buffer. 370 buf := make([]byte, partSize) 371 372 // Create checksums 373 // CRC32C is ~50% faster on AMD64 @ 30GB/s 374 var crcBytes []byte 375 customHeader := make(http.Header) 376 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 377 378 for partNumber <= totalPartsCount { 379 length, rerr := readFull(reader, buf) 380 if rerr == io.EOF && partNumber > 1 { 381 break 382 } 383 384 if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF { 385 return UploadInfo{}, rerr 386 } 387 388 var md5Base64 string 389 if opts.SendContentMd5 { 390 // Calculate md5sum. 391 hash := c.md5Hasher() 392 hash.Write(buf[:length]) 393 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) 394 hash.Close() 395 } else { 396 crc.Reset() 397 crc.Write(buf[:length]) 398 cSum := crc.Sum(nil) 399 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) 400 crcBytes = append(crcBytes, cSum...) 401 } 402 403 // Update progress reader appropriately to the latest offset 404 // as we read from the source. 405 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) 406 407 // Proceed to upload the part. 408 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader} 409 objPart, uerr := c.uploadPart(ctx, p) 410 if uerr != nil { 411 return UploadInfo{}, uerr 412 } 413 414 // Save successfully uploaded part metadata. 415 partsInfo[partNumber] = objPart 416 417 // Save successfully uploaded size. 418 totalUploadedSize += int64(length) 419 420 // Increment part number. 421 partNumber++ 422 423 // For unknown size, Read EOF we break away. 424 // We do not have to upload till totalPartsCount. 425 if rerr == io.EOF { 426 break 427 } 428 } 429 430 // Loop over total uploaded parts to save them in 431 // Parts array before completing the multipart request. 432 for i := 1; i < partNumber; i++ { 433 part, ok := partsInfo[i] 434 if !ok { 435 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 436 } 437 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 438 ETag: part.ETag, 439 PartNumber: part.PartNumber, 440 ChecksumCRC32: part.ChecksumCRC32, 441 ChecksumCRC32C: part.ChecksumCRC32C, 442 ChecksumSHA1: part.ChecksumSHA1, 443 ChecksumSHA256: part.ChecksumSHA256, 444 }) 445 } 446 447 // Sort all completed parts. 448 sort.Sort(completedParts(complMultipartUpload.Parts)) 449 450 opts = PutObjectOptions{} 451 if len(crcBytes) > 0 { 452 // Add hash of hashes. 453 crc.Reset() 454 crc.Write(crcBytes) 455 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} 456 } 457 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) 458 if err != nil { 459 return UploadInfo{}, err 460 } 461 462 uploadInfo.Size = totalUploadedSize 463 return uploadInfo, nil 464 }