gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-put-object-multipart.go (15167B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2015-2017 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"encoding/base64"
     24 	"encoding/hex"
     25 	"encoding/xml"
     26 	"fmt"
     27 	"hash/crc32"
     28 	"io"
     29 	"net/http"
     30 	"net/url"
     31 	"sort"
     32 	"strconv"
     33 	"strings"
     34 
     35 	"github.com/google/uuid"
     36 	"github.com/minio/minio-go/v7/pkg/encrypt"
     37 	"github.com/minio/minio-go/v7/pkg/s3utils"
     38 )
     39 
     40 func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
     41 	opts PutObjectOptions,
     42 ) (info UploadInfo, err error) {
     43 	info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
     44 	if err != nil {
     45 		errResp := ToErrorResponse(err)
     46 		// Verify if multipart functionality is not available, if not
     47 		// fall back to single PutObject operation.
     48 		if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
     49 			// Verify if size of reader is greater than '5GiB'.
     50 			if size > maxSinglePutObjectSize {
     51 				return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
     52 			}
     53 			// Fall back to uploading as single PutObject operation.
     54 			return c.putObject(ctx, bucketName, objectName, reader, size, opts)
     55 		}
     56 	}
     57 	return info, err
     58 }
     59 
     60 func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
     61 	// Input validation.
     62 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
     63 		return UploadInfo{}, err
     64 	}
     65 	if err = s3utils.CheckValidObjectName(objectName); err != nil {
     66 		return UploadInfo{}, err
     67 	}
     68 
     69 	// Total data read and written to server. should be equal to
     70 	// 'size' at the end of the call.
     71 	var totalUploadedSize int64
     72 
     73 	// Complete multipart upload.
     74 	var complMultipartUpload completeMultipartUpload
     75 
     76 	// Calculate the optimal parts info for a given size.
     77 	totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
     78 	if err != nil {
     79 		return UploadInfo{}, err
     80 	}
     81 
     82 	// Choose hash algorithms to be calculated by hashCopyN,
     83 	// avoid sha256 with non-v4 signature request or
     84 	// HTTPS connection.
     85 	hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)
     86 	if len(hashSums) == 0 {
     87 		if opts.UserMetadata == nil {
     88 			opts.UserMetadata = make(map[string]string, 1)
     89 		}
     90 		opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
     91 	}
     92 
     93 	// Initiate a new multipart upload.
     94 	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
     95 	if err != nil {
     96 		return UploadInfo{}, err
     97 	}
     98 	delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
     99 
    100 	defer func() {
    101 		if err != nil {
    102 			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
    103 		}
    104 	}()
    105 
    106 	// Part number always starts with '1'.
    107 	partNumber := 1
    108 
    109 	// Initialize parts uploaded map.
    110 	partsInfo := make(map[int]ObjectPart)
    111 
    112 	// Create a buffer.
    113 	buf := make([]byte, partSize)
    114 
    115 	// Create checksums
    116 	// CRC32C is ~50% faster on AMD64 @ 30GB/s
    117 	var crcBytes []byte
    118 	customHeader := make(http.Header)
    119 	crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    120 	for partNumber <= totalPartsCount {
    121 		length, rErr := readFull(reader, buf)
    122 		if rErr == io.EOF && partNumber > 1 {
    123 			break
    124 		}
    125 
    126 		if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
    127 			return UploadInfo{}, rErr
    128 		}
    129 
    130 		// Calculates hash sums while copying partSize bytes into cw.
    131 		for k, v := range hashAlgos {
    132 			v.Write(buf[:length])
    133 			hashSums[k] = v.Sum(nil)
    134 			v.Close()
    135 		}
    136 
    137 		// Update progress reader appropriately to the latest offset
    138 		// as we read from the source.
    139 		rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
    140 
    141 		// Checksums..
    142 		var (
    143 			md5Base64 string
    144 			sha256Hex string
    145 		)
    146 
    147 		if hashSums["md5"] != nil {
    148 			md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
    149 		}
    150 		if hashSums["sha256"] != nil {
    151 			sha256Hex = hex.EncodeToString(hashSums["sha256"])
    152 		}
    153 		if len(hashSums) == 0 {
    154 			crc.Reset()
    155 			crc.Write(buf[:length])
    156 			cSum := crc.Sum(nil)
    157 			customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
    158 			crcBytes = append(crcBytes, cSum...)
    159 		}
    160 
    161 		p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
    162 		// Proceed to upload the part.
    163 		objPart, uerr := c.uploadPart(ctx, p)
    164 		if uerr != nil {
    165 			return UploadInfo{}, uerr
    166 		}
    167 
    168 		// Save successfully uploaded part metadata.
    169 		partsInfo[partNumber] = objPart
    170 
    171 		// Save successfully uploaded size.
    172 		totalUploadedSize += int64(length)
    173 
    174 		// Increment part number.
    175 		partNumber++
    176 
    177 		// For unknown size, Read EOF we break away.
    178 		// We do not have to upload till totalPartsCount.
    179 		if rErr == io.EOF {
    180 			break
    181 		}
    182 	}
    183 
    184 	// Loop over total uploaded parts to save them in
    185 	// Parts array before completing the multipart request.
    186 	for i := 1; i < partNumber; i++ {
    187 		part, ok := partsInfo[i]
    188 		if !ok {
    189 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
    190 		}
    191 		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
    192 			ETag:           part.ETag,
    193 			PartNumber:     part.PartNumber,
    194 			ChecksumCRC32:  part.ChecksumCRC32,
    195 			ChecksumCRC32C: part.ChecksumCRC32C,
    196 			ChecksumSHA1:   part.ChecksumSHA1,
    197 			ChecksumSHA256: part.ChecksumSHA256,
    198 		})
    199 	}
    200 
    201 	// Sort all completed parts.
    202 	sort.Sort(completedParts(complMultipartUpload.Parts))
    203 	opts = PutObjectOptions{
    204 		ServerSideEncryption: opts.ServerSideEncryption,
    205 	}
    206 	if len(crcBytes) > 0 {
    207 		// Add hash of hashes.
    208 		crc.Reset()
    209 		crc.Write(crcBytes)
    210 		opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
    211 	}
    212 	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
    213 	if err != nil {
    214 		return UploadInfo{}, err
    215 	}
    216 
    217 	uploadInfo.Size = totalUploadedSize
    218 	return uploadInfo, nil
    219 }
    220 
    221 // initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
    222 func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
    223 	// Input validation.
    224 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    225 		return initiateMultipartUploadResult{}, err
    226 	}
    227 	if err := s3utils.CheckValidObjectName(objectName); err != nil {
    228 		return initiateMultipartUploadResult{}, err
    229 	}
    230 
    231 	// Initialize url queries.
    232 	urlValues := make(url.Values)
    233 	urlValues.Set("uploads", "")
    234 
    235 	if opts.Internal.SourceVersionID != "" {
    236 		if opts.Internal.SourceVersionID != nullVersionID {
    237 			if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
    238 				return initiateMultipartUploadResult{}, errInvalidArgument(err.Error())
    239 			}
    240 		}
    241 		urlValues.Set("versionId", opts.Internal.SourceVersionID)
    242 	}
    243 
    244 	// Set ContentType header.
    245 	customHeader := opts.Header()
    246 
    247 	reqMetadata := requestMetadata{
    248 		bucketName:   bucketName,
    249 		objectName:   objectName,
    250 		queryValues:  urlValues,
    251 		customHeader: customHeader,
    252 	}
    253 
    254 	// Execute POST on an objectName to initiate multipart upload.
    255 	resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
    256 	defer closeResponse(resp)
    257 	if err != nil {
    258 		return initiateMultipartUploadResult{}, err
    259 	}
    260 	if resp != nil {
    261 		if resp.StatusCode != http.StatusOK {
    262 			return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
    263 		}
    264 	}
    265 	// Decode xml for new multipart upload.
    266 	initiateMultipartUploadResult := initiateMultipartUploadResult{}
    267 	err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
    268 	if err != nil {
    269 		return initiateMultipartUploadResult, err
    270 	}
    271 	return initiateMultipartUploadResult, nil
    272 }
    273 
    274 type uploadPartParams struct {
    275 	bucketName   string
    276 	objectName   string
    277 	uploadID     string
    278 	reader       io.Reader
    279 	partNumber   int
    280 	md5Base64    string
    281 	sha256Hex    string
    282 	size         int64
    283 	sse          encrypt.ServerSide
    284 	streamSha256 bool
    285 	customHeader http.Header
    286 	trailer      http.Header
    287 }
    288 
    289 // uploadPart - Uploads a part in a multipart upload.
    290 func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart, error) {
    291 	// Input validation.
    292 	if err := s3utils.CheckValidBucketName(p.bucketName); err != nil {
    293 		return ObjectPart{}, err
    294 	}
    295 	if err := s3utils.CheckValidObjectName(p.objectName); err != nil {
    296 		return ObjectPart{}, err
    297 	}
    298 	if p.size > maxPartSize {
    299 		return ObjectPart{}, errEntityTooLarge(p.size, maxPartSize, p.bucketName, p.objectName)
    300 	}
    301 	if p.size <= -1 {
    302 		return ObjectPart{}, errEntityTooSmall(p.size, p.bucketName, p.objectName)
    303 	}
    304 	if p.partNumber <= 0 {
    305 		return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
    306 	}
    307 	if p.uploadID == "" {
    308 		return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
    309 	}
    310 
    311 	// Get resources properly escaped and lined up before using them in http request.
    312 	urlValues := make(url.Values)
    313 	// Set part number.
    314 	urlValues.Set("partNumber", strconv.Itoa(p.partNumber))
    315 	// Set upload id.
    316 	urlValues.Set("uploadId", p.uploadID)
    317 
    318 	// Set encryption headers, if any.
    319 	if p.customHeader == nil {
    320 		p.customHeader = make(http.Header)
    321 	}
    322 	// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
    323 	// Server-side encryption is supported by the S3 Multipart Upload actions.
    324 	// Unless you are using a customer-provided encryption key, you don't need
    325 	// to specify the encryption parameters in each UploadPart request.
    326 	if p.sse != nil && p.sse.Type() == encrypt.SSEC {
    327 		p.sse.Marshal(p.customHeader)
    328 	}
    329 
    330 	reqMetadata := requestMetadata{
    331 		bucketName:       p.bucketName,
    332 		objectName:       p.objectName,
    333 		queryValues:      urlValues,
    334 		customHeader:     p.customHeader,
    335 		contentBody:      p.reader,
    336 		contentLength:    p.size,
    337 		contentMD5Base64: p.md5Base64,
    338 		contentSHA256Hex: p.sha256Hex,
    339 		streamSha256:     p.streamSha256,
    340 		trailer:          p.trailer,
    341 	}
    342 
    343 	// Execute PUT on each part.
    344 	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
    345 	defer closeResponse(resp)
    346 	if err != nil {
    347 		return ObjectPart{}, err
    348 	}
    349 	if resp != nil {
    350 		if resp.StatusCode != http.StatusOK {
    351 			return ObjectPart{}, httpRespToErrorResponse(resp, p.bucketName, p.objectName)
    352 		}
    353 	}
    354 	// Once successfully uploaded, return completed part.
    355 	h := resp.Header
    356 	objPart := ObjectPart{
    357 		ChecksumCRC32:  h.Get("x-amz-checksum-crc32"),
    358 		ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
    359 		ChecksumSHA1:   h.Get("x-amz-checksum-sha1"),
    360 		ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
    361 	}
    362 	objPart.Size = p.size
    363 	objPart.PartNumber = p.partNumber
    364 	// Trim off the odd double quotes from ETag in the beginning and end.
    365 	objPart.ETag = trimEtag(h.Get("ETag"))
    366 	return objPart, nil
    367 }
    368 
    369 // completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
    370 func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
    371 	complete completeMultipartUpload, opts PutObjectOptions,
    372 ) (UploadInfo, error) {
    373 	// Input validation.
    374 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    375 		return UploadInfo{}, err
    376 	}
    377 	if err := s3utils.CheckValidObjectName(objectName); err != nil {
    378 		return UploadInfo{}, err
    379 	}
    380 
    381 	// Initialize url queries.
    382 	urlValues := make(url.Values)
    383 	urlValues.Set("uploadId", uploadID)
    384 	// Marshal complete multipart body.
    385 	completeMultipartUploadBytes, err := xml.Marshal(complete)
    386 	if err != nil {
    387 		return UploadInfo{}, err
    388 	}
    389 
    390 	headers := opts.Header()
    391 	if s3utils.IsAmazonEndpoint(*c.endpointURL) {
    392 		headers.Del(encrypt.SseKmsKeyID)      // Remove X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id not supported in CompleteMultipartUpload
    393 		headers.Del(encrypt.SseGenericHeader) // Remove X-Amz-Server-Side-Encryption not supported in CompleteMultipartUpload
    394 	}
    395 
    396 	// Instantiate all the complete multipart buffer.
    397 	completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
    398 	reqMetadata := requestMetadata{
    399 		bucketName:       bucketName,
    400 		objectName:       objectName,
    401 		queryValues:      urlValues,
    402 		contentBody:      completeMultipartUploadBuffer,
    403 		contentLength:    int64(len(completeMultipartUploadBytes)),
    404 		contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
    405 		customHeader:     headers,
    406 	}
    407 
    408 	// Execute POST to complete multipart upload for an objectName.
    409 	resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
    410 	defer closeResponse(resp)
    411 	if err != nil {
    412 		return UploadInfo{}, err
    413 	}
    414 	if resp != nil {
    415 		if resp.StatusCode != http.StatusOK {
    416 			return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
    417 		}
    418 	}
    419 
    420 	// Read resp.Body into a []bytes to parse for Error response inside the body
    421 	var b []byte
    422 	b, err = io.ReadAll(resp.Body)
    423 	if err != nil {
    424 		return UploadInfo{}, err
    425 	}
    426 	// Decode completed multipart upload response on success.
    427 	completeMultipartUploadResult := completeMultipartUploadResult{}
    428 	err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
    429 	if err != nil {
    430 		// xml parsing failure due to presence an ill-formed xml fragment
    431 		return UploadInfo{}, err
    432 	} else if completeMultipartUploadResult.Bucket == "" {
    433 		// xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
    434 		// In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
    435 		// of the members.
    436 
    437 		// Decode completed multipart upload response on failure
    438 		completeMultipartUploadErr := ErrorResponse{}
    439 		err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
    440 		if err != nil {
    441 			// xml parsing failure due to presence an ill-formed xml fragment
    442 			return UploadInfo{}, err
    443 		}
    444 		return UploadInfo{}, completeMultipartUploadErr
    445 	}
    446 
    447 	// extract lifecycle expiry date and rule ID
    448 	expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
    449 
    450 	return UploadInfo{
    451 		Bucket:           completeMultipartUploadResult.Bucket,
    452 		Key:              completeMultipartUploadResult.Key,
    453 		ETag:             trimEtag(completeMultipartUploadResult.ETag),
    454 		VersionID:        resp.Header.Get(amzVersionID),
    455 		Location:         completeMultipartUploadResult.Location,
    456 		Expiration:       expTime,
    457 		ExpirationRuleID: ruleID,
    458 
    459 		ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
    460 		ChecksumSHA1:   completeMultipartUploadResult.ChecksumSHA1,
    461 		ChecksumCRC32:  completeMultipartUploadResult.ChecksumCRC32,
    462 		ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
    463 	}, nil
    464 }