api-put-object-streaming.go (25127B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "fmt" 25 "hash/crc32" 26 "io" 27 "net/http" 28 "net/url" 29 "sort" 30 "strings" 31 "sync" 32 33 "github.com/google/uuid" 34 "github.com/minio/minio-go/v7/pkg/s3utils" 35 ) 36 37 // putObjectMultipartStream - upload a large object using 38 // multipart upload and streaming signature for signing payload. 39 // Comprehensive put object operation involving multipart uploads. 40 // 41 // Following code handles these types of readers. 42 // 43 // - *minio.Object 44 // - Any reader which has a method 'ReadAt()' 45 func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, 46 reader io.Reader, size int64, opts PutObjectOptions, 47 ) (info UploadInfo, err error) { 48 if opts.ConcurrentStreamParts && opts.NumThreads > 1 { 49 info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts) 50 } else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 { 51 // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader. 52 info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts) 53 } else { 54 info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts) 55 } 56 if err != nil { 57 errResp := ToErrorResponse(err) 58 // Verify if multipart functionality is not available, if not 59 // fall back to single PutObject operation. 60 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { 61 // Verify if size of reader is greater than '5GiB'. 62 if size > maxSinglePutObjectSize { 63 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) 64 } 65 // Fall back to uploading as single PutObject operation. 66 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 67 } 68 } 69 return info, err 70 } 71 72 // uploadedPartRes - the response received from a part upload. 73 type uploadedPartRes struct { 74 Error error // Any error encountered while uploading the part. 75 PartNum int // Number of the part uploaded. 76 Size int64 // Size of the part uploaded. 77 Part ObjectPart 78 } 79 80 type uploadPartReq struct { 81 PartNum int // Number of the part uploaded. 82 Part ObjectPart // Size of the part uploaded. 83 } 84 85 // putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. 86 // Supports all readers which implements io.ReaderAt interface 87 // (ReadAt method). 88 // 89 // NOTE: This function is meant to be used for all readers which 90 // implement io.ReaderAt which allows us for resuming multipart 91 // uploads but reading at an offset, which would avoid re-read the 92 // data which was already uploaded. Internally this function uses 93 // temporary files for staging all the data, these temporary files are 94 // cleaned automatically when the caller i.e http client closes the 95 // stream after uploading all the contents successfully. 96 func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, 97 reader io.ReaderAt, size int64, opts PutObjectOptions, 98 ) (info UploadInfo, err error) { 99 // Input validation. 100 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 101 return UploadInfo{}, err 102 } 103 if err = s3utils.CheckValidObjectName(objectName); err != nil { 104 return UploadInfo{}, err 105 } 106 107 // Calculate the optimal parts info for a given size. 108 totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) 109 if err != nil { 110 return UploadInfo{}, err 111 } 112 113 withChecksum := c.trailingHeaderSupport 114 if withChecksum { 115 if opts.UserMetadata == nil { 116 opts.UserMetadata = make(map[string]string, 1) 117 } 118 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" 119 } 120 // Initiate a new multipart upload. 121 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 122 if err != nil { 123 return UploadInfo{}, err 124 } 125 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") 126 127 // Aborts the multipart upload in progress, if the 128 // function returns any error, since we do not resume 129 // we should purge the parts which have been uploaded 130 // to relinquish storage space. 131 defer func() { 132 if err != nil { 133 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 134 } 135 }() 136 137 // Total data read and written to server. should be equal to 'size' at the end of the call. 138 var totalUploadedSize int64 139 140 // Complete multipart upload. 141 var complMultipartUpload completeMultipartUpload 142 143 // Declare a channel that sends the next part number to be uploaded. 144 uploadPartsCh := make(chan uploadPartReq) 145 146 // Declare a channel that sends back the response of a part upload. 147 uploadedPartsCh := make(chan uploadedPartRes) 148 149 // Used for readability, lastPartNumber is always totalPartsCount. 150 lastPartNumber := totalPartsCount 151 152 partitionCtx, partitionCancel := context.WithCancel(ctx) 153 defer partitionCancel() 154 // Send each part number to the channel to be processed. 155 go func() { 156 defer close(uploadPartsCh) 157 158 for p := 1; p <= totalPartsCount; p++ { 159 select { 160 case <-partitionCtx.Done(): 161 return 162 case uploadPartsCh <- uploadPartReq{PartNum: p}: 163 } 164 } 165 }() 166 167 // Receive each part number from the channel allowing three parallel uploads. 168 for w := 1; w <= opts.getNumThreads(); w++ { 169 go func(partSize int64) { 170 for { 171 var uploadReq uploadPartReq 172 var ok bool 173 select { 174 case <-ctx.Done(): 175 return 176 case uploadReq, ok = <-uploadPartsCh: 177 if !ok { 178 return 179 } 180 // Each worker will draw from the part channel and upload in parallel. 181 } 182 183 // If partNumber was not uploaded we calculate the missing 184 // part offset and size. For all other part numbers we 185 // calculate offset based on multiples of partSize. 186 readOffset := int64(uploadReq.PartNum-1) * partSize 187 188 // As a special case if partNumber is lastPartNumber, we 189 // calculate the offset based on the last part size. 190 if uploadReq.PartNum == lastPartNumber { 191 readOffset = size - lastPartSize 192 partSize = lastPartSize 193 } 194 195 sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) 196 trailer := make(http.Header, 1) 197 if withChecksum { 198 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 199 trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil))) 200 sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) { 201 trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash)) 202 }) 203 } 204 205 // Proceed to upload the part. 206 p := uploadPartParams{ 207 bucketName: bucketName, 208 objectName: objectName, 209 uploadID: uploadID, 210 reader: sectionReader, 211 partNumber: uploadReq.PartNum, 212 size: partSize, 213 sse: opts.ServerSideEncryption, 214 streamSha256: !opts.DisableContentSha256, 215 sha256Hex: "", 216 trailer: trailer, 217 } 218 objPart, err := c.uploadPart(ctx, p) 219 if err != nil { 220 uploadedPartsCh <- uploadedPartRes{ 221 Error: err, 222 } 223 // Exit the goroutine. 224 return 225 } 226 227 // Save successfully uploaded part metadata. 228 uploadReq.Part = objPart 229 230 // Send successful part info through the channel. 231 uploadedPartsCh <- uploadedPartRes{ 232 Size: objPart.Size, 233 PartNum: uploadReq.PartNum, 234 Part: uploadReq.Part, 235 } 236 } 237 }(partSize) 238 } 239 240 // Gather the responses as they occur and update any 241 // progress bar. 242 for u := 1; u <= totalPartsCount; u++ { 243 select { 244 case <-ctx.Done(): 245 return UploadInfo{}, ctx.Err() 246 case uploadRes := <-uploadedPartsCh: 247 if uploadRes.Error != nil { 248 return UploadInfo{}, uploadRes.Error 249 } 250 251 // Update the totalUploadedSize. 252 totalUploadedSize += uploadRes.Size 253 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 254 ETag: uploadRes.Part.ETag, 255 PartNumber: uploadRes.Part.PartNumber, 256 ChecksumCRC32: uploadRes.Part.ChecksumCRC32, 257 ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C, 258 ChecksumSHA1: uploadRes.Part.ChecksumSHA1, 259 ChecksumSHA256: uploadRes.Part.ChecksumSHA256, 260 }) 261 } 262 } 263 264 // Verify if we uploaded all the data. 265 if totalUploadedSize != size { 266 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) 267 } 268 269 // Sort all completed parts. 270 sort.Sort(completedParts(complMultipartUpload.Parts)) 271 272 opts = PutObjectOptions{ 273 ServerSideEncryption: opts.ServerSideEncryption, 274 } 275 if withChecksum { 276 // Add hash of hashes. 277 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 278 for _, part := range complMultipartUpload.Parts { 279 cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C) 280 if err == nil { 281 crc.Write(cs) 282 } 283 } 284 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} 285 } 286 287 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) 288 if err != nil { 289 return UploadInfo{}, err 290 } 291 292 uploadInfo.Size = totalUploadedSize 293 return uploadInfo, nil 294 } 295 296 func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string, 297 reader io.Reader, size int64, opts PutObjectOptions, 298 ) (info UploadInfo, err error) { 299 // Input validation. 300 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 301 return UploadInfo{}, err 302 } 303 if err = s3utils.CheckValidObjectName(objectName); err != nil { 304 return UploadInfo{}, err 305 } 306 307 if !opts.SendContentMd5 { 308 if opts.UserMetadata == nil { 309 opts.UserMetadata = make(map[string]string, 1) 310 } 311 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" 312 } 313 314 // Calculate the optimal parts info for a given size. 315 totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) 316 if err != nil { 317 return UploadInfo{}, err 318 } 319 // Initiates a new multipart request 320 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 321 if err != nil { 322 return UploadInfo{}, err 323 } 324 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") 325 326 // Aborts the multipart upload if the function returns 327 // any error, since we do not resume we should purge 328 // the parts which have been uploaded to relinquish 329 // storage space. 330 defer func() { 331 if err != nil { 332 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 333 } 334 }() 335 336 // Create checksums 337 // CRC32C is ~50% faster on AMD64 @ 30GB/s 338 var crcBytes []byte 339 customHeader := make(http.Header) 340 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 341 md5Hash := c.md5Hasher() 342 defer md5Hash.Close() 343 344 // Total data read and written to server. should be equal to 'size' at the end of the call. 345 var totalUploadedSize int64 346 347 // Initialize parts uploaded map. 348 partsInfo := make(map[int]ObjectPart) 349 350 // Create a buffer. 351 buf := make([]byte, partSize) 352 353 // Avoid declaring variables in the for loop 354 var md5Base64 string 355 356 // Part number always starts with '1'. 357 var partNumber int 358 for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { 359 360 // Proceed to upload the part. 361 if partNumber == totalPartsCount { 362 partSize = lastPartSize 363 } 364 365 length, rerr := readFull(reader, buf) 366 if rerr == io.EOF && partNumber > 1 { 367 break 368 } 369 370 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { 371 return UploadInfo{}, rerr 372 } 373 374 // Calculate md5sum. 375 if opts.SendContentMd5 { 376 md5Hash.Reset() 377 md5Hash.Write(buf[:length]) 378 md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) 379 } else { 380 // Add CRC32C instead. 381 crc.Reset() 382 crc.Write(buf[:length]) 383 cSum := crc.Sum(nil) 384 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) 385 crcBytes = append(crcBytes, cSum...) 386 } 387 388 // Update progress reader appropriately to the latest offset 389 // as we read from the source. 390 hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress) 391 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader} 392 objPart, uerr := c.uploadPart(ctx, p) 393 if uerr != nil { 394 return UploadInfo{}, uerr 395 } 396 397 // Save successfully uploaded part metadata. 398 partsInfo[partNumber] = objPart 399 400 // Save successfully uploaded size. 401 totalUploadedSize += partSize 402 } 403 404 // Verify if we uploaded all the data. 405 if size > 0 { 406 if totalUploadedSize != size { 407 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) 408 } 409 } 410 411 // Complete multipart upload. 412 var complMultipartUpload completeMultipartUpload 413 414 // Loop over total uploaded parts to save them in 415 // Parts array before completing the multipart request. 416 for i := 1; i < partNumber; i++ { 417 part, ok := partsInfo[i] 418 if !ok { 419 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 420 } 421 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 422 ETag: part.ETag, 423 PartNumber: part.PartNumber, 424 ChecksumCRC32: part.ChecksumCRC32, 425 ChecksumCRC32C: part.ChecksumCRC32C, 426 ChecksumSHA1: part.ChecksumSHA1, 427 ChecksumSHA256: part.ChecksumSHA256, 428 }) 429 } 430 431 // Sort all completed parts. 432 sort.Sort(completedParts(complMultipartUpload.Parts)) 433 434 opts = PutObjectOptions{ 435 ServerSideEncryption: opts.ServerSideEncryption, 436 } 437 if len(crcBytes) > 0 { 438 // Add hash of hashes. 439 crc.Reset() 440 crc.Write(crcBytes) 441 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} 442 } 443 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) 444 if err != nil { 445 return UploadInfo{}, err 446 } 447 448 uploadInfo.Size = totalUploadedSize 449 return uploadInfo, nil 450 } 451 452 // putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. 453 // This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer. 454 func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string, 455 reader io.Reader, opts PutObjectOptions, 456 ) (info UploadInfo, err error) { 457 // Input validation. 458 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 459 return UploadInfo{}, err 460 } 461 462 if err = s3utils.CheckValidObjectName(objectName); err != nil { 463 return UploadInfo{}, err 464 } 465 466 if !opts.SendContentMd5 { 467 if opts.UserMetadata == nil { 468 opts.UserMetadata = make(map[string]string, 1) 469 } 470 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" 471 } 472 473 // Cancel all when an error occurs. 474 ctx, cancel := context.WithCancel(ctx) 475 defer cancel() 476 477 // Calculate the optimal parts info for a given size. 478 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) 479 if err != nil { 480 return UploadInfo{}, err 481 } 482 483 // Initiates a new multipart request 484 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 485 if err != nil { 486 return UploadInfo{}, err 487 } 488 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") 489 490 // Aborts the multipart upload if the function returns 491 // any error, since we do not resume we should purge 492 // the parts which have been uploaded to relinquish 493 // storage space. 494 defer func() { 495 if err != nil { 496 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 497 } 498 }() 499 500 // Create checksums 501 // CRC32C is ~50% faster on AMD64 @ 30GB/s 502 var crcBytes []byte 503 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 504 505 // Total data read and written to server. should be equal to 'size' at the end of the call. 506 var totalUploadedSize int64 507 508 // Initialize parts uploaded map. 509 partsInfo := make(map[int]ObjectPart) 510 511 // Create a buffer. 512 nBuffers := int64(opts.NumThreads) 513 bufs := make(chan []byte, nBuffers) 514 all := make([]byte, nBuffers*partSize) 515 for i := int64(0); i < nBuffers; i++ { 516 bufs <- all[i*partSize : i*partSize+partSize] 517 } 518 519 var wg sync.WaitGroup 520 var mu sync.Mutex 521 errCh := make(chan error, opts.NumThreads) 522 523 reader = newHook(reader, opts.Progress) 524 525 // Part number always starts with '1'. 526 var partNumber int 527 for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { 528 // Proceed to upload the part. 529 var buf []byte 530 select { 531 case buf = <-bufs: 532 case err = <-errCh: 533 cancel() 534 wg.Wait() 535 return UploadInfo{}, err 536 } 537 538 if int64(len(buf)) != partSize { 539 return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize) 540 } 541 542 length, rerr := readFull(reader, buf) 543 if rerr == io.EOF && partNumber > 1 { 544 // Done 545 break 546 } 547 548 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { 549 cancel() 550 wg.Wait() 551 return UploadInfo{}, rerr 552 } 553 554 // Calculate md5sum. 555 customHeader := make(http.Header) 556 if !opts.SendContentMd5 { 557 // Add CRC32C instead. 558 crc.Reset() 559 crc.Write(buf[:length]) 560 cSum := crc.Sum(nil) 561 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) 562 crcBytes = append(crcBytes, cSum...) 563 } 564 565 wg.Add(1) 566 go func(partNumber int) { 567 // Avoid declaring variables in the for loop 568 var md5Base64 string 569 570 if opts.SendContentMd5 { 571 md5Hash := c.md5Hasher() 572 md5Hash.Write(buf[:length]) 573 md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) 574 md5Hash.Close() 575 } 576 577 defer wg.Done() 578 p := uploadPartParams{ 579 bucketName: bucketName, 580 objectName: objectName, 581 uploadID: uploadID, 582 reader: bytes.NewReader(buf[:length]), 583 partNumber: partNumber, 584 md5Base64: md5Base64, 585 size: int64(length), 586 sse: opts.ServerSideEncryption, 587 streamSha256: !opts.DisableContentSha256, 588 customHeader: customHeader, 589 } 590 objPart, uerr := c.uploadPart(ctx, p) 591 if uerr != nil { 592 errCh <- uerr 593 return 594 } 595 596 // Save successfully uploaded part metadata. 597 mu.Lock() 598 partsInfo[partNumber] = objPart 599 mu.Unlock() 600 601 // Send buffer back so it can be reused. 602 bufs <- buf 603 }(partNumber) 604 605 // Save successfully uploaded size. 606 totalUploadedSize += int64(length) 607 } 608 wg.Wait() 609 610 // Collect any error 611 select { 612 case err = <-errCh: 613 return UploadInfo{}, err 614 default: 615 } 616 617 // Complete multipart upload. 618 var complMultipartUpload completeMultipartUpload 619 620 // Loop over total uploaded parts to save them in 621 // Parts array before completing the multipart request. 622 for i := 1; i < partNumber; i++ { 623 part, ok := partsInfo[i] 624 if !ok { 625 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 626 } 627 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 628 ETag: part.ETag, 629 PartNumber: part.PartNumber, 630 ChecksumCRC32: part.ChecksumCRC32, 631 ChecksumCRC32C: part.ChecksumCRC32C, 632 ChecksumSHA1: part.ChecksumSHA1, 633 ChecksumSHA256: part.ChecksumSHA256, 634 }) 635 } 636 637 // Sort all completed parts. 638 sort.Sort(completedParts(complMultipartUpload.Parts)) 639 640 opts = PutObjectOptions{} 641 if len(crcBytes) > 0 { 642 // Add hash of hashes. 643 crc.Reset() 644 crc.Write(crcBytes) 645 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} 646 } 647 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) 648 if err != nil { 649 return UploadInfo{}, err 650 } 651 652 uploadInfo.Size = totalUploadedSize 653 return uploadInfo, nil 654 } 655 656 // putObject special function used Google Cloud Storage. This special function 657 // is used for Google Cloud Storage since Google's multipart API is not S3 compatible. 658 func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 659 // Input validation. 660 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 661 return UploadInfo{}, err 662 } 663 if err := s3utils.CheckValidObjectName(objectName); err != nil { 664 return UploadInfo{}, err 665 } 666 667 // Size -1 is only supported on Google Cloud Storage, we error 668 // out in all other situations. 669 if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) { 670 return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName) 671 } 672 673 if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 { 674 return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'") 675 } 676 677 var readSeeker io.Seeker 678 if size > 0 { 679 if isReadAt(reader) && !isObject(reader) { 680 seeker, ok := reader.(io.Seeker) 681 if ok { 682 offset, err := seeker.Seek(0, io.SeekCurrent) 683 if err != nil { 684 return UploadInfo{}, errInvalidArgument(err.Error()) 685 } 686 reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size) 687 readSeeker = reader.(io.Seeker) 688 } 689 } 690 } 691 692 var md5Base64 string 693 if opts.SendContentMd5 { 694 // Calculate md5sum. 695 hash := c.md5Hasher() 696 697 if readSeeker != nil { 698 if _, err := io.Copy(hash, reader); err != nil { 699 return UploadInfo{}, err 700 } 701 // Seek back to beginning of io.NewSectionReader's offset. 702 _, err = readSeeker.Seek(0, io.SeekStart) 703 if err != nil { 704 return UploadInfo{}, errInvalidArgument(err.Error()) 705 } 706 } else { 707 // Create a buffer. 708 buf := make([]byte, size) 709 710 length, err := readFull(reader, buf) 711 if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { 712 return UploadInfo{}, err 713 } 714 715 hash.Write(buf[:length]) 716 reader = bytes.NewReader(buf[:length]) 717 } 718 719 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) 720 hash.Close() 721 } 722 723 // Update progress reader appropriately to the latest offset as we 724 // read from the source. 725 progressReader := newHook(reader, opts.Progress) 726 727 // This function does not calculate sha256 and md5sum for payload. 728 // Execute put object. 729 return c.putObjectDo(ctx, bucketName, objectName, progressReader, md5Base64, "", size, opts) 730 } 731 732 // putObjectDo - executes the put object http operation. 733 // NOTE: You must have WRITE permissions on a bucket to add an object to it. 734 func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) { 735 // Input validation. 736 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 737 return UploadInfo{}, err 738 } 739 if err := s3utils.CheckValidObjectName(objectName); err != nil { 740 return UploadInfo{}, err 741 } 742 // Set headers. 743 customHeader := opts.Header() 744 745 // Add CRC when client supports it, MD5 is not set, not Google and we don't add SHA256 to chunks. 746 addCrc := c.trailingHeaderSupport && md5Base64 == "" && !s3utils.IsGoogleEndpoint(*c.endpointURL) && (opts.DisableContentSha256 || c.secure) 747 748 if addCrc { 749 // If user has added checksums, don't add them ourselves. 750 for k := range opts.UserMetadata { 751 if strings.HasPrefix(strings.ToLower(k), "x-amz-checksum-") { 752 addCrc = false 753 } 754 } 755 } 756 // Populate request metadata. 757 reqMetadata := requestMetadata{ 758 bucketName: bucketName, 759 objectName: objectName, 760 customHeader: customHeader, 761 contentBody: reader, 762 contentLength: size, 763 contentMD5Base64: md5Base64, 764 contentSHA256Hex: sha256Hex, 765 streamSha256: !opts.DisableContentSha256, 766 addCrc: addCrc, 767 } 768 if opts.Internal.SourceVersionID != "" { 769 if opts.Internal.SourceVersionID != nullVersionID { 770 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { 771 return UploadInfo{}, errInvalidArgument(err.Error()) 772 } 773 } 774 urlValues := make(url.Values) 775 urlValues.Set("versionId", opts.Internal.SourceVersionID) 776 reqMetadata.queryValues = urlValues 777 } 778 779 // Execute PUT an objectName. 780 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 781 defer closeResponse(resp) 782 if err != nil { 783 return UploadInfo{}, err 784 } 785 if resp != nil { 786 if resp.StatusCode != http.StatusOK { 787 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) 788 } 789 } 790 791 // extract lifecycle expiry date and rule ID 792 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) 793 h := resp.Header 794 return UploadInfo{ 795 Bucket: bucketName, 796 Key: objectName, 797 ETag: trimEtag(h.Get("ETag")), 798 VersionID: h.Get(amzVersionID), 799 Size: size, 800 Expiration: expTime, 801 ExpirationRuleID: ruleID, 802 803 // Checksum values 804 ChecksumCRC32: h.Get("x-amz-checksum-crc32"), 805 ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), 806 ChecksumSHA1: h.Get("x-amz-checksum-sha1"), 807 ChecksumSHA256: h.Get("x-amz-checksum-sha256"), 808 }, nil 809 }