gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-put-object-streaming.go (25127B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2017 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"encoding/base64"
     24 	"fmt"
     25 	"hash/crc32"
     26 	"io"
     27 	"net/http"
     28 	"net/url"
     29 	"sort"
     30 	"strings"
     31 	"sync"
     32 
     33 	"github.com/google/uuid"
     34 	"github.com/minio/minio-go/v7/pkg/s3utils"
     35 )
     36 
     37 // putObjectMultipartStream - upload a large object using
     38 // multipart upload and streaming signature for signing payload.
     39 // Comprehensive put object operation involving multipart uploads.
     40 //
     41 // Following code handles these types of readers.
     42 //
     43 //   - *minio.Object
     44 //   - Any reader which has a method 'ReadAt()'
     45 func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
     46 	reader io.Reader, size int64, opts PutObjectOptions,
     47 ) (info UploadInfo, err error) {
     48 	if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
     49 		info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
     50 	} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
     51 		// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
     52 		info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
     53 	} else {
     54 		info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts)
     55 	}
     56 	if err != nil {
     57 		errResp := ToErrorResponse(err)
     58 		// Verify if multipart functionality is not available, if not
     59 		// fall back to single PutObject operation.
     60 		if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
     61 			// Verify if size of reader is greater than '5GiB'.
     62 			if size > maxSinglePutObjectSize {
     63 				return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
     64 			}
     65 			// Fall back to uploading as single PutObject operation.
     66 			return c.putObject(ctx, bucketName, objectName, reader, size, opts)
     67 		}
     68 	}
     69 	return info, err
     70 }
     71 
     72 // uploadedPartRes - the response received from a part upload.
     73 type uploadedPartRes struct {
     74 	Error   error // Any error encountered while uploading the part.
     75 	PartNum int   // Number of the part uploaded.
     76 	Size    int64 // Size of the part uploaded.
     77 	Part    ObjectPart
     78 }
     79 
     80 type uploadPartReq struct {
     81 	PartNum int        // Number of the part uploaded.
     82 	Part    ObjectPart // Size of the part uploaded.
     83 }
     84 
     85 // putObjectMultipartFromReadAt - Uploads files bigger than 128MiB.
     86 // Supports all readers which implements io.ReaderAt interface
     87 // (ReadAt method).
     88 //
     89 // NOTE: This function is meant to be used for all readers which
     90 // implement io.ReaderAt which allows us for resuming multipart
     91 // uploads but reading at an offset, which would avoid re-read the
     92 // data which was already uploaded. Internally this function uses
     93 // temporary files for staging all the data, these temporary files are
     94 // cleaned automatically when the caller i.e http client closes the
     95 // stream after uploading all the contents successfully.
     96 func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
     97 	reader io.ReaderAt, size int64, opts PutObjectOptions,
     98 ) (info UploadInfo, err error) {
     99 	// Input validation.
    100 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
    101 		return UploadInfo{}, err
    102 	}
    103 	if err = s3utils.CheckValidObjectName(objectName); err != nil {
    104 		return UploadInfo{}, err
    105 	}
    106 
    107 	// Calculate the optimal parts info for a given size.
    108 	totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
    109 	if err != nil {
    110 		return UploadInfo{}, err
    111 	}
    112 
    113 	withChecksum := c.trailingHeaderSupport
    114 	if withChecksum {
    115 		if opts.UserMetadata == nil {
    116 			opts.UserMetadata = make(map[string]string, 1)
    117 		}
    118 		opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
    119 	}
    120 	// Initiate a new multipart upload.
    121 	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
    122 	if err != nil {
    123 		return UploadInfo{}, err
    124 	}
    125 	delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
    126 
    127 	// Aborts the multipart upload in progress, if the
    128 	// function returns any error, since we do not resume
    129 	// we should purge the parts which have been uploaded
    130 	// to relinquish storage space.
    131 	defer func() {
    132 		if err != nil {
    133 			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
    134 		}
    135 	}()
    136 
    137 	// Total data read and written to server. should be equal to 'size' at the end of the call.
    138 	var totalUploadedSize int64
    139 
    140 	// Complete multipart upload.
    141 	var complMultipartUpload completeMultipartUpload
    142 
    143 	// Declare a channel that sends the next part number to be uploaded.
    144 	uploadPartsCh := make(chan uploadPartReq)
    145 
    146 	// Declare a channel that sends back the response of a part upload.
    147 	uploadedPartsCh := make(chan uploadedPartRes)
    148 
    149 	// Used for readability, lastPartNumber is always totalPartsCount.
    150 	lastPartNumber := totalPartsCount
    151 
    152 	partitionCtx, partitionCancel := context.WithCancel(ctx)
    153 	defer partitionCancel()
    154 	// Send each part number to the channel to be processed.
    155 	go func() {
    156 		defer close(uploadPartsCh)
    157 
    158 		for p := 1; p <= totalPartsCount; p++ {
    159 			select {
    160 			case <-partitionCtx.Done():
    161 				return
    162 			case uploadPartsCh <- uploadPartReq{PartNum: p}:
    163 			}
    164 		}
    165 	}()
    166 
    167 	// Receive each part number from the channel allowing three parallel uploads.
    168 	for w := 1; w <= opts.getNumThreads(); w++ {
    169 		go func(partSize int64) {
    170 			for {
    171 				var uploadReq uploadPartReq
    172 				var ok bool
    173 				select {
    174 				case <-ctx.Done():
    175 					return
    176 				case uploadReq, ok = <-uploadPartsCh:
    177 					if !ok {
    178 						return
    179 					}
    180 					// Each worker will draw from the part channel and upload in parallel.
    181 				}
    182 
    183 				// If partNumber was not uploaded we calculate the missing
    184 				// part offset and size. For all other part numbers we
    185 				// calculate offset based on multiples of partSize.
    186 				readOffset := int64(uploadReq.PartNum-1) * partSize
    187 
    188 				// As a special case if partNumber is lastPartNumber, we
    189 				// calculate the offset based on the last part size.
    190 				if uploadReq.PartNum == lastPartNumber {
    191 					readOffset = size - lastPartSize
    192 					partSize = lastPartSize
    193 				}
    194 
    195 				sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
    196 				trailer := make(http.Header, 1)
    197 				if withChecksum {
    198 					crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    199 					trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
    200 					sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
    201 						trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
    202 					})
    203 				}
    204 
    205 				// Proceed to upload the part.
    206 				p := uploadPartParams{
    207 					bucketName:   bucketName,
    208 					objectName:   objectName,
    209 					uploadID:     uploadID,
    210 					reader:       sectionReader,
    211 					partNumber:   uploadReq.PartNum,
    212 					size:         partSize,
    213 					sse:          opts.ServerSideEncryption,
    214 					streamSha256: !opts.DisableContentSha256,
    215 					sha256Hex:    "",
    216 					trailer:      trailer,
    217 				}
    218 				objPart, err := c.uploadPart(ctx, p)
    219 				if err != nil {
    220 					uploadedPartsCh <- uploadedPartRes{
    221 						Error: err,
    222 					}
    223 					// Exit the goroutine.
    224 					return
    225 				}
    226 
    227 				// Save successfully uploaded part metadata.
    228 				uploadReq.Part = objPart
    229 
    230 				// Send successful part info through the channel.
    231 				uploadedPartsCh <- uploadedPartRes{
    232 					Size:    objPart.Size,
    233 					PartNum: uploadReq.PartNum,
    234 					Part:    uploadReq.Part,
    235 				}
    236 			}
    237 		}(partSize)
    238 	}
    239 
    240 	// Gather the responses as they occur and update any
    241 	// progress bar.
    242 	for u := 1; u <= totalPartsCount; u++ {
    243 		select {
    244 		case <-ctx.Done():
    245 			return UploadInfo{}, ctx.Err()
    246 		case uploadRes := <-uploadedPartsCh:
    247 			if uploadRes.Error != nil {
    248 				return UploadInfo{}, uploadRes.Error
    249 			}
    250 
    251 			// Update the totalUploadedSize.
    252 			totalUploadedSize += uploadRes.Size
    253 			complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
    254 				ETag:           uploadRes.Part.ETag,
    255 				PartNumber:     uploadRes.Part.PartNumber,
    256 				ChecksumCRC32:  uploadRes.Part.ChecksumCRC32,
    257 				ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
    258 				ChecksumSHA1:   uploadRes.Part.ChecksumSHA1,
    259 				ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
    260 			})
    261 		}
    262 	}
    263 
    264 	// Verify if we uploaded all the data.
    265 	if totalUploadedSize != size {
    266 		return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
    267 	}
    268 
    269 	// Sort all completed parts.
    270 	sort.Sort(completedParts(complMultipartUpload.Parts))
    271 
    272 	opts = PutObjectOptions{
    273 		ServerSideEncryption: opts.ServerSideEncryption,
    274 	}
    275 	if withChecksum {
    276 		// Add hash of hashes.
    277 		crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    278 		for _, part := range complMultipartUpload.Parts {
    279 			cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
    280 			if err == nil {
    281 				crc.Write(cs)
    282 			}
    283 		}
    284 		opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
    285 	}
    286 
    287 	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
    288 	if err != nil {
    289 		return UploadInfo{}, err
    290 	}
    291 
    292 	uploadInfo.Size = totalUploadedSize
    293 	return uploadInfo, nil
    294 }
    295 
    296 func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string,
    297 	reader io.Reader, size int64, opts PutObjectOptions,
    298 ) (info UploadInfo, err error) {
    299 	// Input validation.
    300 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
    301 		return UploadInfo{}, err
    302 	}
    303 	if err = s3utils.CheckValidObjectName(objectName); err != nil {
    304 		return UploadInfo{}, err
    305 	}
    306 
    307 	if !opts.SendContentMd5 {
    308 		if opts.UserMetadata == nil {
    309 			opts.UserMetadata = make(map[string]string, 1)
    310 		}
    311 		opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
    312 	}
    313 
    314 	// Calculate the optimal parts info for a given size.
    315 	totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
    316 	if err != nil {
    317 		return UploadInfo{}, err
    318 	}
    319 	// Initiates a new multipart request
    320 	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
    321 	if err != nil {
    322 		return UploadInfo{}, err
    323 	}
    324 	delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
    325 
    326 	// Aborts the multipart upload if the function returns
    327 	// any error, since we do not resume we should purge
    328 	// the parts which have been uploaded to relinquish
    329 	// storage space.
    330 	defer func() {
    331 		if err != nil {
    332 			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
    333 		}
    334 	}()
    335 
    336 	// Create checksums
    337 	// CRC32C is ~50% faster on AMD64 @ 30GB/s
    338 	var crcBytes []byte
    339 	customHeader := make(http.Header)
    340 	crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    341 	md5Hash := c.md5Hasher()
    342 	defer md5Hash.Close()
    343 
    344 	// Total data read and written to server. should be equal to 'size' at the end of the call.
    345 	var totalUploadedSize int64
    346 
    347 	// Initialize parts uploaded map.
    348 	partsInfo := make(map[int]ObjectPart)
    349 
    350 	// Create a buffer.
    351 	buf := make([]byte, partSize)
    352 
    353 	// Avoid declaring variables in the for loop
    354 	var md5Base64 string
    355 
    356 	// Part number always starts with '1'.
    357 	var partNumber int
    358 	for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
    359 
    360 		// Proceed to upload the part.
    361 		if partNumber == totalPartsCount {
    362 			partSize = lastPartSize
    363 		}
    364 
    365 		length, rerr := readFull(reader, buf)
    366 		if rerr == io.EOF && partNumber > 1 {
    367 			break
    368 		}
    369 
    370 		if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
    371 			return UploadInfo{}, rerr
    372 		}
    373 
    374 		// Calculate md5sum.
    375 		if opts.SendContentMd5 {
    376 			md5Hash.Reset()
    377 			md5Hash.Write(buf[:length])
    378 			md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
    379 		} else {
    380 			// Add CRC32C instead.
    381 			crc.Reset()
    382 			crc.Write(buf[:length])
    383 			cSum := crc.Sum(nil)
    384 			customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
    385 			crcBytes = append(crcBytes, cSum...)
    386 		}
    387 
    388 		// Update progress reader appropriately to the latest offset
    389 		// as we read from the source.
    390 		hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)
    391 		p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
    392 		objPart, uerr := c.uploadPart(ctx, p)
    393 		if uerr != nil {
    394 			return UploadInfo{}, uerr
    395 		}
    396 
    397 		// Save successfully uploaded part metadata.
    398 		partsInfo[partNumber] = objPart
    399 
    400 		// Save successfully uploaded size.
    401 		totalUploadedSize += partSize
    402 	}
    403 
    404 	// Verify if we uploaded all the data.
    405 	if size > 0 {
    406 		if totalUploadedSize != size {
    407 			return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
    408 		}
    409 	}
    410 
    411 	// Complete multipart upload.
    412 	var complMultipartUpload completeMultipartUpload
    413 
    414 	// Loop over total uploaded parts to save them in
    415 	// Parts array before completing the multipart request.
    416 	for i := 1; i < partNumber; i++ {
    417 		part, ok := partsInfo[i]
    418 		if !ok {
    419 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
    420 		}
    421 		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
    422 			ETag:           part.ETag,
    423 			PartNumber:     part.PartNumber,
    424 			ChecksumCRC32:  part.ChecksumCRC32,
    425 			ChecksumCRC32C: part.ChecksumCRC32C,
    426 			ChecksumSHA1:   part.ChecksumSHA1,
    427 			ChecksumSHA256: part.ChecksumSHA256,
    428 		})
    429 	}
    430 
    431 	// Sort all completed parts.
    432 	sort.Sort(completedParts(complMultipartUpload.Parts))
    433 
    434 	opts = PutObjectOptions{
    435 		ServerSideEncryption: opts.ServerSideEncryption,
    436 	}
    437 	if len(crcBytes) > 0 {
    438 		// Add hash of hashes.
    439 		crc.Reset()
    440 		crc.Write(crcBytes)
    441 		opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
    442 	}
    443 	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
    444 	if err != nil {
    445 		return UploadInfo{}, err
    446 	}
    447 
    448 	uploadInfo.Size = totalUploadedSize
    449 	return uploadInfo, nil
    450 }
    451 
    452 // putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
    453 // This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
    454 func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
    455 	reader io.Reader, opts PutObjectOptions,
    456 ) (info UploadInfo, err error) {
    457 	// Input validation.
    458 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
    459 		return UploadInfo{}, err
    460 	}
    461 
    462 	if err = s3utils.CheckValidObjectName(objectName); err != nil {
    463 		return UploadInfo{}, err
    464 	}
    465 
    466 	if !opts.SendContentMd5 {
    467 		if opts.UserMetadata == nil {
    468 			opts.UserMetadata = make(map[string]string, 1)
    469 		}
    470 		opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
    471 	}
    472 
    473 	// Cancel all when an error occurs.
    474 	ctx, cancel := context.WithCancel(ctx)
    475 	defer cancel()
    476 
    477 	// Calculate the optimal parts info for a given size.
    478 	totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
    479 	if err != nil {
    480 		return UploadInfo{}, err
    481 	}
    482 
    483 	// Initiates a new multipart request
    484 	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
    485 	if err != nil {
    486 		return UploadInfo{}, err
    487 	}
    488 	delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
    489 
    490 	// Aborts the multipart upload if the function returns
    491 	// any error, since we do not resume we should purge
    492 	// the parts which have been uploaded to relinquish
    493 	// storage space.
    494 	defer func() {
    495 		if err != nil {
    496 			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
    497 		}
    498 	}()
    499 
    500 	// Create checksums
    501 	// CRC32C is ~50% faster on AMD64 @ 30GB/s
    502 	var crcBytes []byte
    503 	crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    504 
    505 	// Total data read and written to server. should be equal to 'size' at the end of the call.
    506 	var totalUploadedSize int64
    507 
    508 	// Initialize parts uploaded map.
    509 	partsInfo := make(map[int]ObjectPart)
    510 
    511 	// Create a buffer.
    512 	nBuffers := int64(opts.NumThreads)
    513 	bufs := make(chan []byte, nBuffers)
    514 	all := make([]byte, nBuffers*partSize)
    515 	for i := int64(0); i < nBuffers; i++ {
    516 		bufs <- all[i*partSize : i*partSize+partSize]
    517 	}
    518 
    519 	var wg sync.WaitGroup
    520 	var mu sync.Mutex
    521 	errCh := make(chan error, opts.NumThreads)
    522 
    523 	reader = newHook(reader, opts.Progress)
    524 
    525 	// Part number always starts with '1'.
    526 	var partNumber int
    527 	for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
    528 		// Proceed to upload the part.
    529 		var buf []byte
    530 		select {
    531 		case buf = <-bufs:
    532 		case err = <-errCh:
    533 			cancel()
    534 			wg.Wait()
    535 			return UploadInfo{}, err
    536 		}
    537 
    538 		if int64(len(buf)) != partSize {
    539 			return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
    540 		}
    541 
    542 		length, rerr := readFull(reader, buf)
    543 		if rerr == io.EOF && partNumber > 1 {
    544 			// Done
    545 			break
    546 		}
    547 
    548 		if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
    549 			cancel()
    550 			wg.Wait()
    551 			return UploadInfo{}, rerr
    552 		}
    553 
    554 		// Calculate md5sum.
    555 		customHeader := make(http.Header)
    556 		if !opts.SendContentMd5 {
    557 			// Add CRC32C instead.
    558 			crc.Reset()
    559 			crc.Write(buf[:length])
    560 			cSum := crc.Sum(nil)
    561 			customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
    562 			crcBytes = append(crcBytes, cSum...)
    563 		}
    564 
    565 		wg.Add(1)
    566 		go func(partNumber int) {
    567 			// Avoid declaring variables in the for loop
    568 			var md5Base64 string
    569 
    570 			if opts.SendContentMd5 {
    571 				md5Hash := c.md5Hasher()
    572 				md5Hash.Write(buf[:length])
    573 				md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
    574 				md5Hash.Close()
    575 			}
    576 
    577 			defer wg.Done()
    578 			p := uploadPartParams{
    579 				bucketName:   bucketName,
    580 				objectName:   objectName,
    581 				uploadID:     uploadID,
    582 				reader:       bytes.NewReader(buf[:length]),
    583 				partNumber:   partNumber,
    584 				md5Base64:    md5Base64,
    585 				size:         int64(length),
    586 				sse:          opts.ServerSideEncryption,
    587 				streamSha256: !opts.DisableContentSha256,
    588 				customHeader: customHeader,
    589 			}
    590 			objPart, uerr := c.uploadPart(ctx, p)
    591 			if uerr != nil {
    592 				errCh <- uerr
    593 				return
    594 			}
    595 
    596 			// Save successfully uploaded part metadata.
    597 			mu.Lock()
    598 			partsInfo[partNumber] = objPart
    599 			mu.Unlock()
    600 
    601 			// Send buffer back so it can be reused.
    602 			bufs <- buf
    603 		}(partNumber)
    604 
    605 		// Save successfully uploaded size.
    606 		totalUploadedSize += int64(length)
    607 	}
    608 	wg.Wait()
    609 
    610 	// Collect any error
    611 	select {
    612 	case err = <-errCh:
    613 		return UploadInfo{}, err
    614 	default:
    615 	}
    616 
    617 	// Complete multipart upload.
    618 	var complMultipartUpload completeMultipartUpload
    619 
    620 	// Loop over total uploaded parts to save them in
    621 	// Parts array before completing the multipart request.
    622 	for i := 1; i < partNumber; i++ {
    623 		part, ok := partsInfo[i]
    624 		if !ok {
    625 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
    626 		}
    627 		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
    628 			ETag:           part.ETag,
    629 			PartNumber:     part.PartNumber,
    630 			ChecksumCRC32:  part.ChecksumCRC32,
    631 			ChecksumCRC32C: part.ChecksumCRC32C,
    632 			ChecksumSHA1:   part.ChecksumSHA1,
    633 			ChecksumSHA256: part.ChecksumSHA256,
    634 		})
    635 	}
    636 
    637 	// Sort all completed parts.
    638 	sort.Sort(completedParts(complMultipartUpload.Parts))
    639 
    640 	opts = PutObjectOptions{}
    641 	if len(crcBytes) > 0 {
    642 		// Add hash of hashes.
    643 		crc.Reset()
    644 		crc.Write(crcBytes)
    645 		opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
    646 	}
    647 	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
    648 	if err != nil {
    649 		return UploadInfo{}, err
    650 	}
    651 
    652 	uploadInfo.Size = totalUploadedSize
    653 	return uploadInfo, nil
    654 }
    655 
    656 // putObject special function used Google Cloud Storage. This special function
    657 // is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
    658 func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
    659 	// Input validation.
    660 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    661 		return UploadInfo{}, err
    662 	}
    663 	if err := s3utils.CheckValidObjectName(objectName); err != nil {
    664 		return UploadInfo{}, err
    665 	}
    666 
    667 	// Size -1 is only supported on Google Cloud Storage, we error
    668 	// out in all other situations.
    669 	if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) {
    670 		return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName)
    671 	}
    672 
    673 	if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 {
    674 		return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'")
    675 	}
    676 
    677 	var readSeeker io.Seeker
    678 	if size > 0 {
    679 		if isReadAt(reader) && !isObject(reader) {
    680 			seeker, ok := reader.(io.Seeker)
    681 			if ok {
    682 				offset, err := seeker.Seek(0, io.SeekCurrent)
    683 				if err != nil {
    684 					return UploadInfo{}, errInvalidArgument(err.Error())
    685 				}
    686 				reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
    687 				readSeeker = reader.(io.Seeker)
    688 			}
    689 		}
    690 	}
    691 
    692 	var md5Base64 string
    693 	if opts.SendContentMd5 {
    694 		// Calculate md5sum.
    695 		hash := c.md5Hasher()
    696 
    697 		if readSeeker != nil {
    698 			if _, err := io.Copy(hash, reader); err != nil {
    699 				return UploadInfo{}, err
    700 			}
    701 			// Seek back to beginning of io.NewSectionReader's offset.
    702 			_, err = readSeeker.Seek(0, io.SeekStart)
    703 			if err != nil {
    704 				return UploadInfo{}, errInvalidArgument(err.Error())
    705 			}
    706 		} else {
    707 			// Create a buffer.
    708 			buf := make([]byte, size)
    709 
    710 			length, err := readFull(reader, buf)
    711 			if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
    712 				return UploadInfo{}, err
    713 			}
    714 
    715 			hash.Write(buf[:length])
    716 			reader = bytes.NewReader(buf[:length])
    717 		}
    718 
    719 		md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
    720 		hash.Close()
    721 	}
    722 
    723 	// Update progress reader appropriately to the latest offset as we
    724 	// read from the source.
    725 	progressReader := newHook(reader, opts.Progress)
    726 
    727 	// This function does not calculate sha256 and md5sum for payload.
    728 	// Execute put object.
    729 	return c.putObjectDo(ctx, bucketName, objectName, progressReader, md5Base64, "", size, opts)
    730 }
    731 
    732 // putObjectDo - executes the put object http operation.
    733 // NOTE: You must have WRITE permissions on a bucket to add an object to it.
    734 func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) {
    735 	// Input validation.
    736 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    737 		return UploadInfo{}, err
    738 	}
    739 	if err := s3utils.CheckValidObjectName(objectName); err != nil {
    740 		return UploadInfo{}, err
    741 	}
    742 	// Set headers.
    743 	customHeader := opts.Header()
    744 
    745 	// Add CRC when client supports it, MD5 is not set, not Google and we don't add SHA256 to chunks.
    746 	addCrc := c.trailingHeaderSupport && md5Base64 == "" && !s3utils.IsGoogleEndpoint(*c.endpointURL) && (opts.DisableContentSha256 || c.secure)
    747 
    748 	if addCrc {
    749 		// If user has added checksums, don't add them ourselves.
    750 		for k := range opts.UserMetadata {
    751 			if strings.HasPrefix(strings.ToLower(k), "x-amz-checksum-") {
    752 				addCrc = false
    753 			}
    754 		}
    755 	}
    756 	// Populate request metadata.
    757 	reqMetadata := requestMetadata{
    758 		bucketName:       bucketName,
    759 		objectName:       objectName,
    760 		customHeader:     customHeader,
    761 		contentBody:      reader,
    762 		contentLength:    size,
    763 		contentMD5Base64: md5Base64,
    764 		contentSHA256Hex: sha256Hex,
    765 		streamSha256:     !opts.DisableContentSha256,
    766 		addCrc:           addCrc,
    767 	}
    768 	if opts.Internal.SourceVersionID != "" {
    769 		if opts.Internal.SourceVersionID != nullVersionID {
    770 			if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
    771 				return UploadInfo{}, errInvalidArgument(err.Error())
    772 			}
    773 		}
    774 		urlValues := make(url.Values)
    775 		urlValues.Set("versionId", opts.Internal.SourceVersionID)
    776 		reqMetadata.queryValues = urlValues
    777 	}
    778 
    779 	// Execute PUT an objectName.
    780 	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
    781 	defer closeResponse(resp)
    782 	if err != nil {
    783 		return UploadInfo{}, err
    784 	}
    785 	if resp != nil {
    786 		if resp.StatusCode != http.StatusOK {
    787 			return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
    788 		}
    789 	}
    790 
    791 	// extract lifecycle expiry date and rule ID
    792 	expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
    793 	h := resp.Header
    794 	return UploadInfo{
    795 		Bucket:           bucketName,
    796 		Key:              objectName,
    797 		ETag:             trimEtag(h.Get("ETag")),
    798 		VersionID:        h.Get(amzVersionID),
    799 		Size:             size,
    800 		Expiration:       expTime,
    801 		ExpirationRuleID: ruleID,
    802 
    803 		// Checksum values
    804 		ChecksumCRC32:  h.Get("x-amz-checksum-crc32"),
    805 		ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
    806 		ChecksumSHA1:   h.Get("x-amz-checksum-sha1"),
    807 		ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
    808 	}, nil
    809 }