api-compose-object.go (18408B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017, 2018 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "context" 22 "fmt" 23 "io" 24 "net/http" 25 "net/url" 26 "strconv" 27 "strings" 28 "time" 29 30 "github.com/google/uuid" 31 "github.com/minio/minio-go/v7/pkg/encrypt" 32 "github.com/minio/minio-go/v7/pkg/s3utils" 33 ) 34 35 // CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs 36 type CopyDestOptions struct { 37 Bucket string // points to destination bucket 38 Object string // points to destination object 39 40 // `Encryption` is the key info for server-side-encryption with customer 41 // provided key. If it is nil, no encryption is performed. 42 Encryption encrypt.ServerSide 43 44 // `userMeta` is the user-metadata key-value pairs to be set on the 45 // destination. The keys are automatically prefixed with `x-amz-meta-` 46 // if needed. If nil is passed, and if only a single source (of any 47 // size) is provided in the ComposeObject call, then metadata from the 48 // source is copied to the destination. 49 // if no user-metadata is provided, it is copied from source 50 // (when there is only once source object in the compose 51 // request) 52 UserMetadata map[string]string 53 // UserMetadata is only set to destination if ReplaceMetadata is true 54 // other value is UserMetadata is ignored and we preserve src.UserMetadata 55 // NOTE: if you set this value to true and now metadata is present 56 // in UserMetadata your destination object will not have any metadata 57 // set. 58 ReplaceMetadata bool 59 60 // `userTags` is the user defined object tags to be set on destination. 61 // This will be set only if the `replaceTags` field is set to true. 62 // Otherwise this field is ignored 63 UserTags map[string]string 64 ReplaceTags bool 65 66 // Specifies whether you want to apply a Legal Hold to the copied object. 67 LegalHold LegalHoldStatus 68 69 // Object Retention related fields 70 Mode RetentionMode 71 RetainUntilDate time.Time 72 73 Size int64 // Needs to be specified if progress bar is specified. 74 // Progress of the entire copy operation will be sent here. 75 Progress io.Reader 76 } 77 78 // Process custom-metadata to remove a `x-amz-meta-` prefix if 79 // present and validate that keys are distinct (after this 80 // prefix removal). 81 func filterCustomMeta(userMeta map[string]string) map[string]string { 82 m := make(map[string]string) 83 for k, v := range userMeta { 84 if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") { 85 k = k[len("x-amz-meta-"):] 86 } 87 if _, ok := m[k]; ok { 88 continue 89 } 90 m[k] = v 91 } 92 return m 93 } 94 95 // Marshal converts all the CopyDestOptions into their 96 // equivalent HTTP header representation 97 func (opts CopyDestOptions) Marshal(header http.Header) { 98 const replaceDirective = "REPLACE" 99 if opts.ReplaceTags { 100 header.Set(amzTaggingHeaderDirective, replaceDirective) 101 if tags := s3utils.TagEncode(opts.UserTags); tags != "" { 102 header.Set(amzTaggingHeader, tags) 103 } 104 } 105 106 if opts.LegalHold != LegalHoldStatus("") { 107 header.Set(amzLegalHoldHeader, opts.LegalHold.String()) 108 } 109 110 if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() { 111 header.Set(amzLockMode, opts.Mode.String()) 112 header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339)) 113 } 114 115 if opts.Encryption != nil { 116 opts.Encryption.Marshal(header) 117 } 118 119 if opts.ReplaceMetadata { 120 header.Set("x-amz-metadata-directive", replaceDirective) 121 for k, v := range filterCustomMeta(opts.UserMetadata) { 122 if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) { 123 header.Set(k, v) 124 } else { 125 header.Set("x-amz-meta-"+k, v) 126 } 127 } 128 } 129 } 130 131 // toDestinationInfo returns a validated copyOptions object. 132 func (opts CopyDestOptions) validate() (err error) { 133 // Input validation. 134 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil { 135 return err 136 } 137 if err = s3utils.CheckValidObjectName(opts.Object); err != nil { 138 return err 139 } 140 if opts.Progress != nil && opts.Size < 0 { 141 return errInvalidArgument("For progress bar effective size needs to be specified") 142 } 143 return nil 144 } 145 146 // CopySrcOptions represents a source object to be copied, using 147 // server-side copying APIs. 148 type CopySrcOptions struct { 149 Bucket, Object string 150 VersionID string 151 MatchETag string 152 NoMatchETag string 153 MatchModifiedSince time.Time 154 MatchUnmodifiedSince time.Time 155 MatchRange bool 156 Start, End int64 157 Encryption encrypt.ServerSide 158 } 159 160 // Marshal converts all the CopySrcOptions into their 161 // equivalent HTTP header representation 162 func (opts CopySrcOptions) Marshal(header http.Header) { 163 // Set the source header 164 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)) 165 if opts.VersionID != "" { 166 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID) 167 } 168 169 if opts.MatchETag != "" { 170 header.Set("x-amz-copy-source-if-match", opts.MatchETag) 171 } 172 if opts.NoMatchETag != "" { 173 header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag) 174 } 175 176 if !opts.MatchModifiedSince.IsZero() { 177 header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat)) 178 } 179 if !opts.MatchUnmodifiedSince.IsZero() { 180 header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat)) 181 } 182 183 if opts.Encryption != nil { 184 encrypt.SSECopy(opts.Encryption).Marshal(header) 185 } 186 } 187 188 func (opts CopySrcOptions) validate() (err error) { 189 // Input validation. 190 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil { 191 return err 192 } 193 if err = s3utils.CheckValidObjectName(opts.Object); err != nil { 194 return err 195 } 196 if opts.Start > opts.End || opts.Start < 0 { 197 return errInvalidArgument("start must be non-negative, and start must be at most end.") 198 } 199 return nil 200 } 201 202 // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy. 203 func (c *Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, 204 metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions, 205 ) (ObjectInfo, error) { 206 // Build headers. 207 headers := make(http.Header) 208 209 // Set all the metadata headers. 210 for k, v := range metadata { 211 headers.Set(k, v) 212 } 213 if !dstOpts.Internal.ReplicationStatus.Empty() { 214 headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus)) 215 } 216 if !dstOpts.Internal.SourceMTime.IsZero() { 217 headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano)) 218 } 219 if dstOpts.Internal.SourceETag != "" { 220 headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag) 221 } 222 if dstOpts.Internal.ReplicationRequest { 223 headers.Set(minIOBucketReplicationRequest, "true") 224 } 225 if !dstOpts.Internal.LegalholdTimestamp.IsZero() { 226 headers.Set(minIOBucketReplicationObjectLegalHoldTimestamp, dstOpts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano)) 227 } 228 if !dstOpts.Internal.RetentionTimestamp.IsZero() { 229 headers.Set(minIOBucketReplicationObjectRetentionTimestamp, dstOpts.Internal.RetentionTimestamp.Format(time.RFC3339Nano)) 230 } 231 if !dstOpts.Internal.TaggingTimestamp.IsZero() { 232 headers.Set(minIOBucketReplicationTaggingTimestamp, dstOpts.Internal.TaggingTimestamp.Format(time.RFC3339Nano)) 233 } 234 235 if len(dstOpts.UserTags) != 0 { 236 headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags)) 237 } 238 239 reqMetadata := requestMetadata{ 240 bucketName: destBucket, 241 objectName: destObject, 242 customHeader: headers, 243 } 244 if dstOpts.Internal.SourceVersionID != "" { 245 if dstOpts.Internal.SourceVersionID != nullVersionID { 246 if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil { 247 return ObjectInfo{}, errInvalidArgument(err.Error()) 248 } 249 } 250 urlValues := make(url.Values) 251 urlValues.Set("versionId", dstOpts.Internal.SourceVersionID) 252 reqMetadata.queryValues = urlValues 253 } 254 255 // Set the source header 256 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)) 257 if srcOpts.VersionID != "" { 258 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID) 259 } 260 // Send upload-part-copy request 261 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 262 defer closeResponse(resp) 263 if err != nil { 264 return ObjectInfo{}, err 265 } 266 267 // Check if we got an error response. 268 if resp.StatusCode != http.StatusOK { 269 return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject) 270 } 271 272 cpObjRes := copyObjectResult{} 273 err = xmlDecoder(resp.Body, &cpObjRes) 274 if err != nil { 275 return ObjectInfo{}, err 276 } 277 278 objInfo := ObjectInfo{ 279 Key: destObject, 280 ETag: strings.Trim(cpObjRes.ETag, "\""), 281 LastModified: cpObjRes.LastModified, 282 } 283 return objInfo, nil 284 } 285 286 func (c *Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, 287 partID int, startOffset int64, length int64, metadata map[string]string, 288 ) (p CompletePart, err error) { 289 headers := make(http.Header) 290 291 // Set source 292 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)) 293 294 if startOffset < 0 { 295 return p, errInvalidArgument("startOffset must be non-negative") 296 } 297 298 if length >= 0 { 299 headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1)) 300 } 301 302 for k, v := range metadata { 303 headers.Set(k, v) 304 } 305 306 queryValues := make(url.Values) 307 queryValues.Set("partNumber", strconv.Itoa(partID)) 308 queryValues.Set("uploadId", uploadID) 309 310 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{ 311 bucketName: destBucket, 312 objectName: destObject, 313 customHeader: headers, 314 queryValues: queryValues, 315 }) 316 defer closeResponse(resp) 317 if err != nil { 318 return 319 } 320 321 // Check if we got an error response. 322 if resp.StatusCode != http.StatusOK { 323 return p, httpRespToErrorResponse(resp, destBucket, destObject) 324 } 325 326 // Decode copy-part response on success. 327 cpObjRes := copyObjectResult{} 328 err = xmlDecoder(resp.Body, &cpObjRes) 329 if err != nil { 330 return p, err 331 } 332 p.PartNumber, p.ETag = partID, cpObjRes.ETag 333 return p, nil 334 } 335 336 // uploadPartCopy - helper function to create a part in a multipart 337 // upload via an upload-part-copy request 338 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html 339 func (c *Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int, 340 headers http.Header, 341 ) (p CompletePart, err error) { 342 // Build query parameters 343 urlValues := make(url.Values) 344 urlValues.Set("partNumber", strconv.Itoa(partNumber)) 345 urlValues.Set("uploadId", uploadID) 346 347 // Send upload-part-copy request 348 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{ 349 bucketName: bucket, 350 objectName: object, 351 customHeader: headers, 352 queryValues: urlValues, 353 }) 354 defer closeResponse(resp) 355 if err != nil { 356 return p, err 357 } 358 359 // Check if we got an error response. 360 if resp.StatusCode != http.StatusOK { 361 return p, httpRespToErrorResponse(resp, bucket, object) 362 } 363 364 // Decode copy-part response on success. 365 cpObjRes := copyObjectResult{} 366 err = xmlDecoder(resp.Body, &cpObjRes) 367 if err != nil { 368 return p, err 369 } 370 p.PartNumber, p.ETag = partNumber, cpObjRes.ETag 371 return p, nil 372 } 373 374 // ComposeObject - creates an object using server-side copying 375 // of existing objects. It takes a list of source objects (with optional offsets) 376 // and concatenates them into a new object using only server-side copying 377 // operations. Optionally takes progress reader hook for applications to 378 // look at current progress. 379 func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) { 380 if len(srcs) < 1 || len(srcs) > maxPartsCount { 381 return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.") 382 } 383 384 for _, src := range srcs { 385 if err := src.validate(); err != nil { 386 return UploadInfo{}, err 387 } 388 } 389 390 if err := dst.validate(); err != nil { 391 return UploadInfo{}, err 392 } 393 394 srcObjectInfos := make([]ObjectInfo, len(srcs)) 395 srcObjectSizes := make([]int64, len(srcs)) 396 var totalSize, totalParts int64 397 var err error 398 for i, src := range srcs { 399 opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID} 400 srcObjectInfos[i], err = c.StatObject(context.Background(), src.Bucket, src.Object, opts) 401 if err != nil { 402 return UploadInfo{}, err 403 } 404 405 srcCopySize := srcObjectInfos[i].Size 406 // Check if a segment is specified, and if so, is the 407 // segment within object bounds? 408 if src.MatchRange { 409 // Since range is specified, 410 // 0 <= src.start <= src.end 411 // so only invalid case to check is: 412 if src.End >= srcCopySize || src.Start < 0 { 413 return UploadInfo{}, errInvalidArgument( 414 fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)", 415 i, src.Start, src.End, srcCopySize)) 416 } 417 srcCopySize = src.End - src.Start + 1 418 } 419 420 // Only the last source may be less than `absMinPartSize` 421 if srcCopySize < absMinPartSize && i < len(srcs)-1 { 422 return UploadInfo{}, errInvalidArgument( 423 fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize)) 424 } 425 426 // Is data to copy too large? 427 totalSize += srcCopySize 428 if totalSize > maxMultipartPutObjectSize { 429 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize)) 430 } 431 432 // record source size 433 srcObjectSizes[i] = srcCopySize 434 435 // calculate parts needed for current source 436 totalParts += partsRequired(srcCopySize) 437 // Do we need more parts than we are allowed? 438 if totalParts > maxPartsCount { 439 return UploadInfo{}, errInvalidArgument(fmt.Sprintf( 440 "Your proposed compose object requires more than %d parts", maxPartsCount)) 441 } 442 } 443 444 // Single source object case (i.e. when only one source is 445 // involved, it is being copied wholly and at most 5GiB in 446 // size, emptyfiles are also supported). 447 if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) { 448 return c.CopyObject(ctx, dst, srcs[0]) 449 } 450 451 // Now, handle multipart-copy cases. 452 453 // 1. Ensure that the object has not been changed while 454 // we are copying data. 455 for i, src := range srcs { 456 src.MatchETag = srcObjectInfos[i].ETag 457 } 458 459 // 2. Initiate a new multipart upload. 460 461 // Set user-metadata on the destination object. If no 462 // user-metadata is specified, and there is only one source, 463 // (only) then metadata from source is copied. 464 var userMeta map[string]string 465 if dst.ReplaceMetadata { 466 userMeta = dst.UserMetadata 467 } else { 468 userMeta = srcObjectInfos[0].UserMetadata 469 } 470 471 var userTags map[string]string 472 if dst.ReplaceTags { 473 userTags = dst.UserTags 474 } else { 475 userTags = srcObjectInfos[0].UserTags 476 } 477 478 uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{ 479 ServerSideEncryption: dst.Encryption, 480 UserMetadata: userMeta, 481 UserTags: userTags, 482 Mode: dst.Mode, 483 RetainUntilDate: dst.RetainUntilDate, 484 LegalHold: dst.LegalHold, 485 }) 486 if err != nil { 487 return UploadInfo{}, err 488 } 489 490 // 3. Perform copy part uploads 491 objParts := []CompletePart{} 492 partIndex := 1 493 for i, src := range srcs { 494 h := make(http.Header) 495 src.Marshal(h) 496 if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC { 497 dst.Encryption.Marshal(h) 498 } 499 500 // calculate start/end indices of parts after 501 // splitting. 502 startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src) 503 for j, start := range startIdx { 504 end := endIdx[j] 505 506 // Add (or reset) source range header for 507 // upload part copy request. 508 h.Set("x-amz-copy-source-range", 509 fmt.Sprintf("bytes=%d-%d", start, end)) 510 511 // make upload-part-copy request 512 complPart, err := c.uploadPartCopy(ctx, dst.Bucket, 513 dst.Object, uploadID, partIndex, h) 514 if err != nil { 515 return UploadInfo{}, err 516 } 517 if dst.Progress != nil { 518 io.CopyN(io.Discard, dst.Progress, end-start+1) 519 } 520 objParts = append(objParts, complPart) 521 partIndex++ 522 } 523 } 524 525 // 4. Make final complete-multipart request. 526 uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID, 527 completeMultipartUpload{Parts: objParts}, PutObjectOptions{ServerSideEncryption: dst.Encryption}) 528 if err != nil { 529 return UploadInfo{}, err 530 } 531 532 uploadInfo.Size = totalSize 533 return uploadInfo, nil 534 } 535 536 // partsRequired is maximum parts possible with 537 // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1)) 538 func partsRequired(size int64) int64 { 539 maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1) 540 r := size / int64(maxPartSize) 541 if size%int64(maxPartSize) > 0 { 542 r++ 543 } 544 return r 545 } 546 547 // calculateEvenSplits - computes splits for a source and returns 548 // start and end index slices. Splits happen evenly to be sure that no 549 // part is less than 5MiB, as that could fail the multipart request if 550 // it is not the last part. 551 func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) { 552 if size == 0 { 553 return 554 } 555 556 reqParts := partsRequired(size) 557 startIndex = make([]int64, reqParts) 558 endIndex = make([]int64, reqParts) 559 // Compute number of required parts `k`, as: 560 // 561 // k = ceiling(size / copyPartSize) 562 // 563 // Now, distribute the `size` bytes in the source into 564 // k parts as evenly as possible: 565 // 566 // r parts sized (q+1) bytes, and 567 // (k - r) parts sized q bytes, where 568 // 569 // size = q * k + r (by simple division of size by k, 570 // so that 0 <= r < k) 571 // 572 start := src.Start 573 if start == -1 { 574 start = 0 575 } 576 quot, rem := size/reqParts, size%reqParts 577 nextStart := start 578 for j := int64(0); j < reqParts; j++ { 579 curPartSize := quot 580 if j < rem { 581 curPartSize++ 582 } 583 584 cStart := nextStart 585 cEnd := cStart + curPartSize - 1 586 nextStart = cEnd + 1 587 588 startIndex[j], endIndex[j] = cStart, cEnd 589 } 590 return 591 }