
Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-put-object.go (14861B)

      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2015-2017 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     18 package minio
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"encoding/base64"
     24 	"errors"
     25 	"fmt"
     26 	"hash/crc32"
     27 	"io"
     28 	"net/http"
     29 	"sort"
     30 	"time"
     32 	""
     33 	""
     34 	""
     35 )
     37 // ReplicationStatus represents replication status of object
     38 type ReplicationStatus string
     40 const (
     41 	// ReplicationStatusPending indicates replication is pending
     42 	ReplicationStatusPending ReplicationStatus = "PENDING"
     43 	// ReplicationStatusComplete indicates replication completed ok
     44 	ReplicationStatusComplete ReplicationStatus = "COMPLETED"
     45 	// ReplicationStatusFailed indicates replication failed
     46 	ReplicationStatusFailed ReplicationStatus = "FAILED"
     47 	// ReplicationStatusReplica indicates object is a replica of a source
     48 	ReplicationStatusReplica ReplicationStatus = "REPLICA"
     49 )
     51 // Empty returns true if no replication status set.
     52 func (r ReplicationStatus) Empty() bool {
     53 	return r == ""
     54 }
     56 // AdvancedPutOptions for internal use - to be utilized by replication, ILM transition
     57 // implementation on MinIO server
     58 type AdvancedPutOptions struct {
     59 	SourceVersionID    string
     60 	SourceETag         string
     61 	ReplicationStatus  ReplicationStatus
     62 	SourceMTime        time.Time
     63 	ReplicationRequest bool
     64 	RetentionTimestamp time.Time
     65 	TaggingTimestamp   time.Time
     66 	LegalholdTimestamp time.Time
     67 }
     69 // PutObjectOptions represents options specified by user for PutObject call
     70 type PutObjectOptions struct {
     71 	UserMetadata            map[string]string
     72 	UserTags                map[string]string
     73 	Progress                io.Reader
     74 	ContentType             string
     75 	ContentEncoding         string
     76 	ContentDisposition      string
     77 	ContentLanguage         string
     78 	CacheControl            string
     79 	Mode                    RetentionMode
     80 	RetainUntilDate         time.Time
     81 	ServerSideEncryption    encrypt.ServerSide
     82 	NumThreads              uint
     83 	StorageClass            string
     84 	WebsiteRedirectLocation string
     85 	PartSize                uint64
     86 	LegalHold               LegalHoldStatus
     87 	SendContentMd5          bool
     88 	DisableContentSha256    bool
     89 	DisableMultipart        bool
     91 	// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
     92 	// fill them serially and upload them in parallel.
     93 	// This can be used for faster uploads on non-seekable or slow-to-seek input.
     94 	ConcurrentStreamParts bool
     95 	Internal              AdvancedPutOptions
     97 	customHeaders http.Header
     98 }
    100 // SetMatchETag if etag matches while PUT MinIO returns an error
    101 // this is a MinIO specific extension to support optimistic locking
    102 // semantics.
    103 func (opts *PutObjectOptions) SetMatchETag(etag string) {
    104 	if opts.customHeaders == nil {
    105 		opts.customHeaders = http.Header{}
    106 	}
    107 	opts.customHeaders.Set("If-Match", "\""+etag+"\"")
    108 }
    110 // SetMatchETagExcept if etag does not match while PUT MinIO returns an
    111 // error this is a MinIO specific extension to support optimistic locking
    112 // semantics.
    113 func (opts *PutObjectOptions) SetMatchETagExcept(etag string) {
    114 	if opts.customHeaders == nil {
    115 		opts.customHeaders = http.Header{}
    116 	}
    117 	opts.customHeaders.Set("If-None-Match", "\""+etag+"\"")
    118 }
    120 // getNumThreads - gets the number of threads to be used in the multipart
    121 // put object operation
    122 func (opts PutObjectOptions) getNumThreads() (numThreads int) {
    123 	if opts.NumThreads > 0 {
    124 		numThreads = int(opts.NumThreads)
    125 	} else {
    126 		numThreads = totalWorkers
    127 	}
    128 	return
    129 }
    131 // Header - constructs the headers from metadata entered by user in
    132 // PutObjectOptions struct
    133 func (opts PutObjectOptions) Header() (header http.Header) {
    134 	header = make(http.Header)
    136 	contentType := opts.ContentType
    137 	if contentType == "" {
    138 		contentType = "application/octet-stream"
    139 	}
    140 	header.Set("Content-Type", contentType)
    142 	if opts.ContentEncoding != "" {
    143 		header.Set("Content-Encoding", opts.ContentEncoding)
    144 	}
    145 	if opts.ContentDisposition != "" {
    146 		header.Set("Content-Disposition", opts.ContentDisposition)
    147 	}
    148 	if opts.ContentLanguage != "" {
    149 		header.Set("Content-Language", opts.ContentLanguage)
    150 	}
    151 	if opts.CacheControl != "" {
    152 		header.Set("Cache-Control", opts.CacheControl)
    153 	}
    155 	if opts.Mode != "" {
    156 		header.Set(amzLockMode, opts.Mode.String())
    157 	}
    159 	if !opts.RetainUntilDate.IsZero() {
    160 		header.Set("X-Amz-Object-Lock-Retain-Until-Date", opts.RetainUntilDate.Format(time.RFC3339))
    161 	}
    163 	if opts.LegalHold != "" {
    164 		header.Set(amzLegalHoldHeader, opts.LegalHold.String())
    165 	}
    167 	if opts.ServerSideEncryption != nil {
    168 		opts.ServerSideEncryption.Marshal(header)
    169 	}
    171 	if opts.StorageClass != "" {
    172 		header.Set(amzStorageClass, opts.StorageClass)
    173 	}
    175 	if opts.WebsiteRedirectLocation != "" {
    176 		header.Set(amzWebsiteRedirectLocation, opts.WebsiteRedirectLocation)
    177 	}
    179 	if !opts.Internal.ReplicationStatus.Empty() {
    180 		header.Set(amzBucketReplicationStatus, string(opts.Internal.ReplicationStatus))
    181 	}
    182 	if !opts.Internal.SourceMTime.IsZero() {
    183 		header.Set(minIOBucketSourceMTime, opts.Internal.SourceMTime.Format(time.RFC3339Nano))
    184 	}
    185 	if opts.Internal.SourceETag != "" {
    186 		header.Set(minIOBucketSourceETag, opts.Internal.SourceETag)
    187 	}
    188 	if opts.Internal.ReplicationRequest {
    189 		header.Set(minIOBucketReplicationRequest, "true")
    190 	}
    191 	if !opts.Internal.LegalholdTimestamp.IsZero() {
    192 		header.Set(minIOBucketReplicationObjectLegalHoldTimestamp, opts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
    193 	}
    194 	if !opts.Internal.RetentionTimestamp.IsZero() {
    195 		header.Set(minIOBucketReplicationObjectRetentionTimestamp, opts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
    196 	}
    197 	if !opts.Internal.TaggingTimestamp.IsZero() {
    198 		header.Set(minIOBucketReplicationTaggingTimestamp, opts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
    199 	}
    201 	if len(opts.UserTags) != 0 {
    202 		header.Set(amzTaggingHeader, s3utils.TagEncode(opts.UserTags))
    203 	}
    205 	for k, v := range opts.UserMetadata {
    206 		if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
    207 			header.Set(k, v)
    208 		} else {
    209 			header.Set("x-amz-meta-"+k, v)
    210 		}
    211 	}
    213 	// set any other additional custom headers.
    214 	for k, v := range opts.customHeaders {
    215 		header[k] = v
    216 	}
    218 	return
    219 }
    221 // validate() checks if the UserMetadata map has standard headers or and raises an error if so.
    222 func (opts PutObjectOptions) validate() (err error) {
    223 	for k, v := range opts.UserMetadata {
    224 		if !httpguts.ValidHeaderFieldName(k) || isStandardHeader(k) || isSSEHeader(k) || isStorageClassHeader(k) {
    225 			return errInvalidArgument(k + " unsupported user defined metadata name")
    226 		}
    227 		if !httpguts.ValidHeaderFieldValue(v) {
    228 			return errInvalidArgument(v + " unsupported user defined metadata value")
    229 		}
    230 	}
    231 	if opts.Mode != "" && !opts.Mode.IsValid() {
    232 		return errInvalidArgument(opts.Mode.String() + " unsupported retention mode")
    233 	}
    234 	if opts.LegalHold != "" && !opts.LegalHold.IsValid() {
    235 		return errInvalidArgument(opts.LegalHold.String() + " unsupported legal-hold status")
    236 	}
    237 	return nil
    238 }
    240 // completedParts is a collection of parts sortable by their part numbers.
    241 // used for sorting the uploaded parts before completing the multipart request.
    242 type completedParts []CompletePart
    244 func (a completedParts) Len() int           { return len(a) }
    245 func (a completedParts) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
    246 func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
    248 // PutObject creates an object in a bucket.
    249 //
    250 // You must have WRITE permissions on a bucket to create an object.
    251 //
    252 //   - For size smaller than 16MiB PutObject automatically does a
    253 //     single atomic PUT operation.
    254 //
    255 //   - For size larger than 16MiB PutObject automatically does a
    256 //     multipart upload operation.
    257 //
    258 //   - For size input as -1 PutObject does a multipart Put operation
    259 //     until input stream reaches EOF. Maximum object size that can
    260 //     be uploaded through this operation will be 5TiB.
    261 //
    262 //     WARNING: Passing down '-1' will use memory and these cannot
    263 //     be reused for best outcomes for PutObject(), pass the size always.
    264 //
    265 // NOTE: Upon errors during upload multipart operation is entirely aborted.
    266 func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
    267 	opts PutObjectOptions,
    268 ) (info UploadInfo, err error) {
    269 	if objectSize < 0 && opts.DisableMultipart {
    270 		return UploadInfo{}, errors.New("object size must be provided with disable multipart upload")
    271 	}
    273 	err = opts.validate()
    274 	if err != nil {
    275 		return UploadInfo{}, err
    276 	}
    278 	return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
    279 }
    281 func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
    282 	// Check for largest object size allowed.
    283 	if size > int64(maxMultipartPutObjectSize) {
    284 		return UploadInfo{}, errEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
    285 	}
    287 	// NOTE: Streaming signature is not supported by GCS.
    288 	if s3utils.IsGoogleEndpoint(*c.endpointURL) {
    289 		return c.putObject(ctx, bucketName, objectName, reader, size, opts)
    290 	}
    292 	partSize := opts.PartSize
    293 	if opts.PartSize == 0 {
    294 		partSize = minPartSize
    295 	}
    297 	if c.overrideSignerType.IsV2() {
    298 		if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
    299 			return c.putObject(ctx, bucketName, objectName, reader, size, opts)
    300 		}
    301 		return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
    302 	}
    304 	if size < 0 {
    305 		if opts.DisableMultipart {
    306 			return UploadInfo{}, errors.New("no length provided and multipart disabled")
    307 		}
    308 		if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
    309 			return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
    310 		}
    311 		return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
    312 	}
    314 	if size < int64(partSize) || opts.DisableMultipart {
    315 		return c.putObject(ctx, bucketName, objectName, reader, size, opts)
    316 	}
    318 	return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
    319 }
    321 func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
    322 	// Input validation.
    323 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
    324 		return UploadInfo{}, err
    325 	}
    326 	if err = s3utils.CheckValidObjectName(objectName); err != nil {
    327 		return UploadInfo{}, err
    328 	}
    330 	// Total data read and written to server. should be equal to
    331 	// 'size' at the end of the call.
    332 	var totalUploadedSize int64
    334 	// Complete multipart upload.
    335 	var complMultipartUpload completeMultipartUpload
    337 	// Calculate the optimal parts info for a given size.
    338 	totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
    339 	if err != nil {
    340 		return UploadInfo{}, err
    341 	}
    343 	if !opts.SendContentMd5 {
    344 		if opts.UserMetadata == nil {
    345 			opts.UserMetadata = make(map[string]string, 1)
    346 		}
    347 		opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
    348 	}
    350 	// Initiate a new multipart upload.
    351 	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
    352 	if err != nil {
    353 		return UploadInfo{}, err
    354 	}
    355 	delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
    357 	defer func() {
    358 		if err != nil {
    359 			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
    360 		}
    361 	}()
    363 	// Part number always starts with '1'.
    364 	partNumber := 1
    366 	// Initialize parts uploaded map.
    367 	partsInfo := make(map[int]ObjectPart)
    369 	// Create a buffer.
    370 	buf := make([]byte, partSize)
    372 	// Create checksums
    373 	// CRC32C is ~50% faster on AMD64 @ 30GB/s
    374 	var crcBytes []byte
    375 	customHeader := make(http.Header)
    376 	crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    378 	for partNumber <= totalPartsCount {
    379 		length, rerr := readFull(reader, buf)
    380 		if rerr == io.EOF && partNumber > 1 {
    381 			break
    382 		}
    384 		if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF {
    385 			return UploadInfo{}, rerr
    386 		}
    388 		var md5Base64 string
    389 		if opts.SendContentMd5 {
    390 			// Calculate md5sum.
    391 			hash := c.md5Hasher()
    392 			hash.Write(buf[:length])
    393 			md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
    394 			hash.Close()
    395 		} else {
    396 			crc.Reset()
    397 			crc.Write(buf[:length])
    398 			cSum := crc.Sum(nil)
    399 			customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
    400 			crcBytes = append(crcBytes, cSum...)
    401 		}
    403 		// Update progress reader appropriately to the latest offset
    404 		// as we read from the source.
    405 		rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
    407 		// Proceed to upload the part.
    408 		p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
    409 		objPart, uerr := c.uploadPart(ctx, p)
    410 		if uerr != nil {
    411 			return UploadInfo{}, uerr
    412 		}
    414 		// Save successfully uploaded part metadata.
    415 		partsInfo[partNumber] = objPart
    417 		// Save successfully uploaded size.
    418 		totalUploadedSize += int64(length)
    420 		// Increment part number.
    421 		partNumber++
    423 		// For unknown size, Read EOF we break away.
    424 		// We do not have to upload till totalPartsCount.
    425 		if rerr == io.EOF {
    426 			break
    427 		}
    428 	}
    430 	// Loop over total uploaded parts to save them in
    431 	// Parts array before completing the multipart request.
    432 	for i := 1; i < partNumber; i++ {
    433 		part, ok := partsInfo[i]
    434 		if !ok {
    435 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
    436 		}
    437 		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
    438 			ETag:           part.ETag,
    439 			PartNumber:     part.PartNumber,
    440 			ChecksumCRC32:  part.ChecksumCRC32,
    441 			ChecksumCRC32C: part.ChecksumCRC32C,
    442 			ChecksumSHA1:   part.ChecksumSHA1,
    443 			ChecksumSHA256: part.ChecksumSHA256,
    444 		})
    445 	}
    447 	// Sort all completed parts.
    448 	sort.Sort(completedParts(complMultipartUpload.Parts))
    450 	opts = PutObjectOptions{}
    451 	if len(crcBytes) > 0 {
    452 		// Add hash of hashes.
    453 		crc.Reset()
    454 		crc.Write(crcBytes)
    455 		opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
    456 	}
    457 	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
    458 	if err != nil {
    459 		return UploadInfo{}, err
    460 	}
    462 	uploadInfo.Size = totalUploadedSize
    463 	return uploadInfo, nil
    464 }