api-put-object-multipart.go (15167B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2015-2017 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "encoding/hex" 25 "encoding/xml" 26 "fmt" 27 "hash/crc32" 28 "io" 29 "net/http" 30 "net/url" 31 "sort" 32 "strconv" 33 "strings" 34 35 "github.com/google/uuid" 36 "github.com/minio/minio-go/v7/pkg/encrypt" 37 "github.com/minio/minio-go/v7/pkg/s3utils" 38 ) 39 40 func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, 41 opts PutObjectOptions, 42 ) (info UploadInfo, err error) { 43 info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts) 44 if err != nil { 45 errResp := ToErrorResponse(err) 46 // Verify if multipart functionality is not available, if not 47 // fall back to single PutObject operation. 48 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { 49 // Verify if size of reader is greater than '5GiB'. 50 if size > maxSinglePutObjectSize { 51 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) 52 } 53 // Fall back to uploading as single PutObject operation. 54 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 55 } 56 } 57 return info, err 58 } 59 60 func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) { 61 // Input validation. 62 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 63 return UploadInfo{}, err 64 } 65 if err = s3utils.CheckValidObjectName(objectName); err != nil { 66 return UploadInfo{}, err 67 } 68 69 // Total data read and written to server. should be equal to 70 // 'size' at the end of the call. 71 var totalUploadedSize int64 72 73 // Complete multipart upload. 74 var complMultipartUpload completeMultipartUpload 75 76 // Calculate the optimal parts info for a given size. 77 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) 78 if err != nil { 79 return UploadInfo{}, err 80 } 81 82 // Choose hash algorithms to be calculated by hashCopyN, 83 // avoid sha256 with non-v4 signature request or 84 // HTTPS connection. 85 hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256) 86 if len(hashSums) == 0 { 87 if opts.UserMetadata == nil { 88 opts.UserMetadata = make(map[string]string, 1) 89 } 90 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" 91 } 92 93 // Initiate a new multipart upload. 94 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 95 if err != nil { 96 return UploadInfo{}, err 97 } 98 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") 99 100 defer func() { 101 if err != nil { 102 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 103 } 104 }() 105 106 // Part number always starts with '1'. 107 partNumber := 1 108 109 // Initialize parts uploaded map. 110 partsInfo := make(map[int]ObjectPart) 111 112 // Create a buffer. 113 buf := make([]byte, partSize) 114 115 // Create checksums 116 // CRC32C is ~50% faster on AMD64 @ 30GB/s 117 var crcBytes []byte 118 customHeader := make(http.Header) 119 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 120 for partNumber <= totalPartsCount { 121 length, rErr := readFull(reader, buf) 122 if rErr == io.EOF && partNumber > 1 { 123 break 124 } 125 126 if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF { 127 return UploadInfo{}, rErr 128 } 129 130 // Calculates hash sums while copying partSize bytes into cw. 131 for k, v := range hashAlgos { 132 v.Write(buf[:length]) 133 hashSums[k] = v.Sum(nil) 134 v.Close() 135 } 136 137 // Update progress reader appropriately to the latest offset 138 // as we read from the source. 139 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) 140 141 // Checksums.. 142 var ( 143 md5Base64 string 144 sha256Hex string 145 ) 146 147 if hashSums["md5"] != nil { 148 md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"]) 149 } 150 if hashSums["sha256"] != nil { 151 sha256Hex = hex.EncodeToString(hashSums["sha256"]) 152 } 153 if len(hashSums) == 0 { 154 crc.Reset() 155 crc.Write(buf[:length]) 156 cSum := crc.Sum(nil) 157 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) 158 crcBytes = append(crcBytes, cSum...) 159 } 160 161 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader} 162 // Proceed to upload the part. 163 objPart, uerr := c.uploadPart(ctx, p) 164 if uerr != nil { 165 return UploadInfo{}, uerr 166 } 167 168 // Save successfully uploaded part metadata. 169 partsInfo[partNumber] = objPart 170 171 // Save successfully uploaded size. 172 totalUploadedSize += int64(length) 173 174 // Increment part number. 175 partNumber++ 176 177 // For unknown size, Read EOF we break away. 178 // We do not have to upload till totalPartsCount. 179 if rErr == io.EOF { 180 break 181 } 182 } 183 184 // Loop over total uploaded parts to save them in 185 // Parts array before completing the multipart request. 186 for i := 1; i < partNumber; i++ { 187 part, ok := partsInfo[i] 188 if !ok { 189 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 190 } 191 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 192 ETag: part.ETag, 193 PartNumber: part.PartNumber, 194 ChecksumCRC32: part.ChecksumCRC32, 195 ChecksumCRC32C: part.ChecksumCRC32C, 196 ChecksumSHA1: part.ChecksumSHA1, 197 ChecksumSHA256: part.ChecksumSHA256, 198 }) 199 } 200 201 // Sort all completed parts. 202 sort.Sort(completedParts(complMultipartUpload.Parts)) 203 opts = PutObjectOptions{ 204 ServerSideEncryption: opts.ServerSideEncryption, 205 } 206 if len(crcBytes) > 0 { 207 // Add hash of hashes. 208 crc.Reset() 209 crc.Write(crcBytes) 210 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} 211 } 212 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) 213 if err != nil { 214 return UploadInfo{}, err 215 } 216 217 uploadInfo.Size = totalUploadedSize 218 return uploadInfo, nil 219 } 220 221 // initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. 222 func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) { 223 // Input validation. 224 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 225 return initiateMultipartUploadResult{}, err 226 } 227 if err := s3utils.CheckValidObjectName(objectName); err != nil { 228 return initiateMultipartUploadResult{}, err 229 } 230 231 // Initialize url queries. 232 urlValues := make(url.Values) 233 urlValues.Set("uploads", "") 234 235 if opts.Internal.SourceVersionID != "" { 236 if opts.Internal.SourceVersionID != nullVersionID { 237 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { 238 return initiateMultipartUploadResult{}, errInvalidArgument(err.Error()) 239 } 240 } 241 urlValues.Set("versionId", opts.Internal.SourceVersionID) 242 } 243 244 // Set ContentType header. 245 customHeader := opts.Header() 246 247 reqMetadata := requestMetadata{ 248 bucketName: bucketName, 249 objectName: objectName, 250 queryValues: urlValues, 251 customHeader: customHeader, 252 } 253 254 // Execute POST on an objectName to initiate multipart upload. 255 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata) 256 defer closeResponse(resp) 257 if err != nil { 258 return initiateMultipartUploadResult{}, err 259 } 260 if resp != nil { 261 if resp.StatusCode != http.StatusOK { 262 return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) 263 } 264 } 265 // Decode xml for new multipart upload. 266 initiateMultipartUploadResult := initiateMultipartUploadResult{} 267 err = xmlDecoder(resp.Body, &initiateMultipartUploadResult) 268 if err != nil { 269 return initiateMultipartUploadResult, err 270 } 271 return initiateMultipartUploadResult, nil 272 } 273 274 type uploadPartParams struct { 275 bucketName string 276 objectName string 277 uploadID string 278 reader io.Reader 279 partNumber int 280 md5Base64 string 281 sha256Hex string 282 size int64 283 sse encrypt.ServerSide 284 streamSha256 bool 285 customHeader http.Header 286 trailer http.Header 287 } 288 289 // uploadPart - Uploads a part in a multipart upload. 290 func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart, error) { 291 // Input validation. 292 if err := s3utils.CheckValidBucketName(p.bucketName); err != nil { 293 return ObjectPart{}, err 294 } 295 if err := s3utils.CheckValidObjectName(p.objectName); err != nil { 296 return ObjectPart{}, err 297 } 298 if p.size > maxPartSize { 299 return ObjectPart{}, errEntityTooLarge(p.size, maxPartSize, p.bucketName, p.objectName) 300 } 301 if p.size <= -1 { 302 return ObjectPart{}, errEntityTooSmall(p.size, p.bucketName, p.objectName) 303 } 304 if p.partNumber <= 0 { 305 return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.") 306 } 307 if p.uploadID == "" { 308 return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.") 309 } 310 311 // Get resources properly escaped and lined up before using them in http request. 312 urlValues := make(url.Values) 313 // Set part number. 314 urlValues.Set("partNumber", strconv.Itoa(p.partNumber)) 315 // Set upload id. 316 urlValues.Set("uploadId", p.uploadID) 317 318 // Set encryption headers, if any. 319 if p.customHeader == nil { 320 p.customHeader = make(http.Header) 321 } 322 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html 323 // Server-side encryption is supported by the S3 Multipart Upload actions. 324 // Unless you are using a customer-provided encryption key, you don't need 325 // to specify the encryption parameters in each UploadPart request. 326 if p.sse != nil && p.sse.Type() == encrypt.SSEC { 327 p.sse.Marshal(p.customHeader) 328 } 329 330 reqMetadata := requestMetadata{ 331 bucketName: p.bucketName, 332 objectName: p.objectName, 333 queryValues: urlValues, 334 customHeader: p.customHeader, 335 contentBody: p.reader, 336 contentLength: p.size, 337 contentMD5Base64: p.md5Base64, 338 contentSHA256Hex: p.sha256Hex, 339 streamSha256: p.streamSha256, 340 trailer: p.trailer, 341 } 342 343 // Execute PUT on each part. 344 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 345 defer closeResponse(resp) 346 if err != nil { 347 return ObjectPart{}, err 348 } 349 if resp != nil { 350 if resp.StatusCode != http.StatusOK { 351 return ObjectPart{}, httpRespToErrorResponse(resp, p.bucketName, p.objectName) 352 } 353 } 354 // Once successfully uploaded, return completed part. 355 h := resp.Header 356 objPart := ObjectPart{ 357 ChecksumCRC32: h.Get("x-amz-checksum-crc32"), 358 ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), 359 ChecksumSHA1: h.Get("x-amz-checksum-sha1"), 360 ChecksumSHA256: h.Get("x-amz-checksum-sha256"), 361 } 362 objPart.Size = p.size 363 objPart.PartNumber = p.partNumber 364 // Trim off the odd double quotes from ETag in the beginning and end. 365 objPart.ETag = trimEtag(h.Get("ETag")) 366 return objPart, nil 367 } 368 369 // completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. 370 func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string, 371 complete completeMultipartUpload, opts PutObjectOptions, 372 ) (UploadInfo, error) { 373 // Input validation. 374 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 375 return UploadInfo{}, err 376 } 377 if err := s3utils.CheckValidObjectName(objectName); err != nil { 378 return UploadInfo{}, err 379 } 380 381 // Initialize url queries. 382 urlValues := make(url.Values) 383 urlValues.Set("uploadId", uploadID) 384 // Marshal complete multipart body. 385 completeMultipartUploadBytes, err := xml.Marshal(complete) 386 if err != nil { 387 return UploadInfo{}, err 388 } 389 390 headers := opts.Header() 391 if s3utils.IsAmazonEndpoint(*c.endpointURL) { 392 headers.Del(encrypt.SseKmsKeyID) // Remove X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id not supported in CompleteMultipartUpload 393 headers.Del(encrypt.SseGenericHeader) // Remove X-Amz-Server-Side-Encryption not supported in CompleteMultipartUpload 394 } 395 396 // Instantiate all the complete multipart buffer. 397 completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes) 398 reqMetadata := requestMetadata{ 399 bucketName: bucketName, 400 objectName: objectName, 401 queryValues: urlValues, 402 contentBody: completeMultipartUploadBuffer, 403 contentLength: int64(len(completeMultipartUploadBytes)), 404 contentSHA256Hex: sum256Hex(completeMultipartUploadBytes), 405 customHeader: headers, 406 } 407 408 // Execute POST to complete multipart upload for an objectName. 409 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata) 410 defer closeResponse(resp) 411 if err != nil { 412 return UploadInfo{}, err 413 } 414 if resp != nil { 415 if resp.StatusCode != http.StatusOK { 416 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) 417 } 418 } 419 420 // Read resp.Body into a []bytes to parse for Error response inside the body 421 var b []byte 422 b, err = io.ReadAll(resp.Body) 423 if err != nil { 424 return UploadInfo{}, err 425 } 426 // Decode completed multipart upload response on success. 427 completeMultipartUploadResult := completeMultipartUploadResult{} 428 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult) 429 if err != nil { 430 // xml parsing failure due to presence an ill-formed xml fragment 431 return UploadInfo{}, err 432 } else if completeMultipartUploadResult.Bucket == "" { 433 // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied. 434 // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values 435 // of the members. 436 437 // Decode completed multipart upload response on failure 438 completeMultipartUploadErr := ErrorResponse{} 439 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr) 440 if err != nil { 441 // xml parsing failure due to presence an ill-formed xml fragment 442 return UploadInfo{}, err 443 } 444 return UploadInfo{}, completeMultipartUploadErr 445 } 446 447 // extract lifecycle expiry date and rule ID 448 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) 449 450 return UploadInfo{ 451 Bucket: completeMultipartUploadResult.Bucket, 452 Key: completeMultipartUploadResult.Key, 453 ETag: trimEtag(completeMultipartUploadResult.ETag), 454 VersionID: resp.Header.Get(amzVersionID), 455 Location: completeMultipartUploadResult.Location, 456 Expiration: expTime, 457 ExpirationRuleID: ruleID, 458 459 ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256, 460 ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1, 461 ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32, 462 ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C, 463 }, nil 464 }