gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-compose-object.go (18408B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2017, 2018 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"context"
     22 	"fmt"
     23 	"io"
     24 	"net/http"
     25 	"net/url"
     26 	"strconv"
     27 	"strings"
     28 	"time"
     29 
     30 	"github.com/google/uuid"
     31 	"github.com/minio/minio-go/v7/pkg/encrypt"
     32 	"github.com/minio/minio-go/v7/pkg/s3utils"
     33 )
     34 
     35 // CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs
     36 type CopyDestOptions struct {
     37 	Bucket string // points to destination bucket
     38 	Object string // points to destination object
     39 
     40 	// `Encryption` is the key info for server-side-encryption with customer
     41 	// provided key. If it is nil, no encryption is performed.
     42 	Encryption encrypt.ServerSide
     43 
     44 	// `userMeta` is the user-metadata key-value pairs to be set on the
     45 	// destination. The keys are automatically prefixed with `x-amz-meta-`
     46 	// if needed. If nil is passed, and if only a single source (of any
     47 	// size) is provided in the ComposeObject call, then metadata from the
     48 	// source is copied to the destination.
     49 	// if no user-metadata is provided, it is copied from source
     50 	// (when there is only once source object in the compose
     51 	// request)
     52 	UserMetadata map[string]string
     53 	// UserMetadata is only set to destination if ReplaceMetadata is true
     54 	// other value is UserMetadata is ignored and we preserve src.UserMetadata
     55 	// NOTE: if you set this value to true and now metadata is present
     56 	// in UserMetadata your destination object will not have any metadata
     57 	// set.
     58 	ReplaceMetadata bool
     59 
     60 	// `userTags` is the user defined object tags to be set on destination.
     61 	// This will be set only if the `replaceTags` field is set to true.
     62 	// Otherwise this field is ignored
     63 	UserTags    map[string]string
     64 	ReplaceTags bool
     65 
     66 	// Specifies whether you want to apply a Legal Hold to the copied object.
     67 	LegalHold LegalHoldStatus
     68 
     69 	// Object Retention related fields
     70 	Mode            RetentionMode
     71 	RetainUntilDate time.Time
     72 
     73 	Size int64 // Needs to be specified if progress bar is specified.
     74 	// Progress of the entire copy operation will be sent here.
     75 	Progress io.Reader
     76 }
     77 
     78 // Process custom-metadata to remove a `x-amz-meta-` prefix if
     79 // present and validate that keys are distinct (after this
     80 // prefix removal).
     81 func filterCustomMeta(userMeta map[string]string) map[string]string {
     82 	m := make(map[string]string)
     83 	for k, v := range userMeta {
     84 		if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
     85 			k = k[len("x-amz-meta-"):]
     86 		}
     87 		if _, ok := m[k]; ok {
     88 			continue
     89 		}
     90 		m[k] = v
     91 	}
     92 	return m
     93 }
     94 
     95 // Marshal converts all the CopyDestOptions into their
     96 // equivalent HTTP header representation
     97 func (opts CopyDestOptions) Marshal(header http.Header) {
     98 	const replaceDirective = "REPLACE"
     99 	if opts.ReplaceTags {
    100 		header.Set(amzTaggingHeaderDirective, replaceDirective)
    101 		if tags := s3utils.TagEncode(opts.UserTags); tags != "" {
    102 			header.Set(amzTaggingHeader, tags)
    103 		}
    104 	}
    105 
    106 	if opts.LegalHold != LegalHoldStatus("") {
    107 		header.Set(amzLegalHoldHeader, opts.LegalHold.String())
    108 	}
    109 
    110 	if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() {
    111 		header.Set(amzLockMode, opts.Mode.String())
    112 		header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339))
    113 	}
    114 
    115 	if opts.Encryption != nil {
    116 		opts.Encryption.Marshal(header)
    117 	}
    118 
    119 	if opts.ReplaceMetadata {
    120 		header.Set("x-amz-metadata-directive", replaceDirective)
    121 		for k, v := range filterCustomMeta(opts.UserMetadata) {
    122 			if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
    123 				header.Set(k, v)
    124 			} else {
    125 				header.Set("x-amz-meta-"+k, v)
    126 			}
    127 		}
    128 	}
    129 }
    130 
    131 // toDestinationInfo returns a validated copyOptions object.
    132 func (opts CopyDestOptions) validate() (err error) {
    133 	// Input validation.
    134 	if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
    135 		return err
    136 	}
    137 	if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
    138 		return err
    139 	}
    140 	if opts.Progress != nil && opts.Size < 0 {
    141 		return errInvalidArgument("For progress bar effective size needs to be specified")
    142 	}
    143 	return nil
    144 }
    145 
    146 // CopySrcOptions represents a source object to be copied, using
    147 // server-side copying APIs.
    148 type CopySrcOptions struct {
    149 	Bucket, Object       string
    150 	VersionID            string
    151 	MatchETag            string
    152 	NoMatchETag          string
    153 	MatchModifiedSince   time.Time
    154 	MatchUnmodifiedSince time.Time
    155 	MatchRange           bool
    156 	Start, End           int64
    157 	Encryption           encrypt.ServerSide
    158 }
    159 
    160 // Marshal converts all the CopySrcOptions into their
    161 // equivalent HTTP header representation
    162 func (opts CopySrcOptions) Marshal(header http.Header) {
    163 	// Set the source header
    164 	header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object))
    165 	if opts.VersionID != "" {
    166 		header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID)
    167 	}
    168 
    169 	if opts.MatchETag != "" {
    170 		header.Set("x-amz-copy-source-if-match", opts.MatchETag)
    171 	}
    172 	if opts.NoMatchETag != "" {
    173 		header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag)
    174 	}
    175 
    176 	if !opts.MatchModifiedSince.IsZero() {
    177 		header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat))
    178 	}
    179 	if !opts.MatchUnmodifiedSince.IsZero() {
    180 		header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat))
    181 	}
    182 
    183 	if opts.Encryption != nil {
    184 		encrypt.SSECopy(opts.Encryption).Marshal(header)
    185 	}
    186 }
    187 
    188 func (opts CopySrcOptions) validate() (err error) {
    189 	// Input validation.
    190 	if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
    191 		return err
    192 	}
    193 	if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
    194 		return err
    195 	}
    196 	if opts.Start > opts.End || opts.Start < 0 {
    197 		return errInvalidArgument("start must be non-negative, and start must be at most end.")
    198 	}
    199 	return nil
    200 }
    201 
    202 // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
    203 func (c *Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
    204 	metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions,
    205 ) (ObjectInfo, error) {
    206 	// Build headers.
    207 	headers := make(http.Header)
    208 
    209 	// Set all the metadata headers.
    210 	for k, v := range metadata {
    211 		headers.Set(k, v)
    212 	}
    213 	if !dstOpts.Internal.ReplicationStatus.Empty() {
    214 		headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus))
    215 	}
    216 	if !dstOpts.Internal.SourceMTime.IsZero() {
    217 		headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano))
    218 	}
    219 	if dstOpts.Internal.SourceETag != "" {
    220 		headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag)
    221 	}
    222 	if dstOpts.Internal.ReplicationRequest {
    223 		headers.Set(minIOBucketReplicationRequest, "true")
    224 	}
    225 	if !dstOpts.Internal.LegalholdTimestamp.IsZero() {
    226 		headers.Set(minIOBucketReplicationObjectLegalHoldTimestamp, dstOpts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
    227 	}
    228 	if !dstOpts.Internal.RetentionTimestamp.IsZero() {
    229 		headers.Set(minIOBucketReplicationObjectRetentionTimestamp, dstOpts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
    230 	}
    231 	if !dstOpts.Internal.TaggingTimestamp.IsZero() {
    232 		headers.Set(minIOBucketReplicationTaggingTimestamp, dstOpts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
    233 	}
    234 
    235 	if len(dstOpts.UserTags) != 0 {
    236 		headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags))
    237 	}
    238 
    239 	reqMetadata := requestMetadata{
    240 		bucketName:   destBucket,
    241 		objectName:   destObject,
    242 		customHeader: headers,
    243 	}
    244 	if dstOpts.Internal.SourceVersionID != "" {
    245 		if dstOpts.Internal.SourceVersionID != nullVersionID {
    246 			if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil {
    247 				return ObjectInfo{}, errInvalidArgument(err.Error())
    248 			}
    249 		}
    250 		urlValues := make(url.Values)
    251 		urlValues.Set("versionId", dstOpts.Internal.SourceVersionID)
    252 		reqMetadata.queryValues = urlValues
    253 	}
    254 
    255 	// Set the source header
    256 	headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
    257 	if srcOpts.VersionID != "" {
    258 		headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID)
    259 	}
    260 	// Send upload-part-copy request
    261 	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
    262 	defer closeResponse(resp)
    263 	if err != nil {
    264 		return ObjectInfo{}, err
    265 	}
    266 
    267 	// Check if we got an error response.
    268 	if resp.StatusCode != http.StatusOK {
    269 		return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
    270 	}
    271 
    272 	cpObjRes := copyObjectResult{}
    273 	err = xmlDecoder(resp.Body, &cpObjRes)
    274 	if err != nil {
    275 		return ObjectInfo{}, err
    276 	}
    277 
    278 	objInfo := ObjectInfo{
    279 		Key:          destObject,
    280 		ETag:         strings.Trim(cpObjRes.ETag, "\""),
    281 		LastModified: cpObjRes.LastModified,
    282 	}
    283 	return objInfo, nil
    284 }
    285 
    286 func (c *Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
    287 	partID int, startOffset int64, length int64, metadata map[string]string,
    288 ) (p CompletePart, err error) {
    289 	headers := make(http.Header)
    290 
    291 	// Set source
    292 	headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
    293 
    294 	if startOffset < 0 {
    295 		return p, errInvalidArgument("startOffset must be non-negative")
    296 	}
    297 
    298 	if length >= 0 {
    299 		headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
    300 	}
    301 
    302 	for k, v := range metadata {
    303 		headers.Set(k, v)
    304 	}
    305 
    306 	queryValues := make(url.Values)
    307 	queryValues.Set("partNumber", strconv.Itoa(partID))
    308 	queryValues.Set("uploadId", uploadID)
    309 
    310 	resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
    311 		bucketName:   destBucket,
    312 		objectName:   destObject,
    313 		customHeader: headers,
    314 		queryValues:  queryValues,
    315 	})
    316 	defer closeResponse(resp)
    317 	if err != nil {
    318 		return
    319 	}
    320 
    321 	// Check if we got an error response.
    322 	if resp.StatusCode != http.StatusOK {
    323 		return p, httpRespToErrorResponse(resp, destBucket, destObject)
    324 	}
    325 
    326 	// Decode copy-part response on success.
    327 	cpObjRes := copyObjectResult{}
    328 	err = xmlDecoder(resp.Body, &cpObjRes)
    329 	if err != nil {
    330 		return p, err
    331 	}
    332 	p.PartNumber, p.ETag = partID, cpObjRes.ETag
    333 	return p, nil
    334 }
    335 
    336 // uploadPartCopy - helper function to create a part in a multipart
    337 // upload via an upload-part-copy request
    338 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
    339 func (c *Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
    340 	headers http.Header,
    341 ) (p CompletePart, err error) {
    342 	// Build query parameters
    343 	urlValues := make(url.Values)
    344 	urlValues.Set("partNumber", strconv.Itoa(partNumber))
    345 	urlValues.Set("uploadId", uploadID)
    346 
    347 	// Send upload-part-copy request
    348 	resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
    349 		bucketName:   bucket,
    350 		objectName:   object,
    351 		customHeader: headers,
    352 		queryValues:  urlValues,
    353 	})
    354 	defer closeResponse(resp)
    355 	if err != nil {
    356 		return p, err
    357 	}
    358 
    359 	// Check if we got an error response.
    360 	if resp.StatusCode != http.StatusOK {
    361 		return p, httpRespToErrorResponse(resp, bucket, object)
    362 	}
    363 
    364 	// Decode copy-part response on success.
    365 	cpObjRes := copyObjectResult{}
    366 	err = xmlDecoder(resp.Body, &cpObjRes)
    367 	if err != nil {
    368 		return p, err
    369 	}
    370 	p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
    371 	return p, nil
    372 }
    373 
    374 // ComposeObject - creates an object using server-side copying
    375 // of existing objects. It takes a list of source objects (with optional offsets)
    376 // and concatenates them into a new object using only server-side copying
    377 // operations. Optionally takes progress reader hook for applications to
    378 // look at current progress.
    379 func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) {
    380 	if len(srcs) < 1 || len(srcs) > maxPartsCount {
    381 		return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.")
    382 	}
    383 
    384 	for _, src := range srcs {
    385 		if err := src.validate(); err != nil {
    386 			return UploadInfo{}, err
    387 		}
    388 	}
    389 
    390 	if err := dst.validate(); err != nil {
    391 		return UploadInfo{}, err
    392 	}
    393 
    394 	srcObjectInfos := make([]ObjectInfo, len(srcs))
    395 	srcObjectSizes := make([]int64, len(srcs))
    396 	var totalSize, totalParts int64
    397 	var err error
    398 	for i, src := range srcs {
    399 		opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID}
    400 		srcObjectInfos[i], err = c.StatObject(context.Background(), src.Bucket, src.Object, opts)
    401 		if err != nil {
    402 			return UploadInfo{}, err
    403 		}
    404 
    405 		srcCopySize := srcObjectInfos[i].Size
    406 		// Check if a segment is specified, and if so, is the
    407 		// segment within object bounds?
    408 		if src.MatchRange {
    409 			// Since range is specified,
    410 			//    0 <= src.start <= src.end
    411 			// so only invalid case to check is:
    412 			if src.End >= srcCopySize || src.Start < 0 {
    413 				return UploadInfo{}, errInvalidArgument(
    414 					fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)",
    415 						i, src.Start, src.End, srcCopySize))
    416 			}
    417 			srcCopySize = src.End - src.Start + 1
    418 		}
    419 
    420 		// Only the last source may be less than `absMinPartSize`
    421 		if srcCopySize < absMinPartSize && i < len(srcs)-1 {
    422 			return UploadInfo{}, errInvalidArgument(
    423 				fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize))
    424 		}
    425 
    426 		// Is data to copy too large?
    427 		totalSize += srcCopySize
    428 		if totalSize > maxMultipartPutObjectSize {
    429 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
    430 		}
    431 
    432 		// record source size
    433 		srcObjectSizes[i] = srcCopySize
    434 
    435 		// calculate parts needed for current source
    436 		totalParts += partsRequired(srcCopySize)
    437 		// Do we need more parts than we are allowed?
    438 		if totalParts > maxPartsCount {
    439 			return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
    440 				"Your proposed compose object requires more than %d parts", maxPartsCount))
    441 		}
    442 	}
    443 
    444 	// Single source object case (i.e. when only one source is
    445 	// involved, it is being copied wholly and at most 5GiB in
    446 	// size, emptyfiles are also supported).
    447 	if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
    448 		return c.CopyObject(ctx, dst, srcs[0])
    449 	}
    450 
    451 	// Now, handle multipart-copy cases.
    452 
    453 	// 1. Ensure that the object has not been changed while
    454 	//    we are copying data.
    455 	for i, src := range srcs {
    456 		src.MatchETag = srcObjectInfos[i].ETag
    457 	}
    458 
    459 	// 2. Initiate a new multipart upload.
    460 
    461 	// Set user-metadata on the destination object. If no
    462 	// user-metadata is specified, and there is only one source,
    463 	// (only) then metadata from source is copied.
    464 	var userMeta map[string]string
    465 	if dst.ReplaceMetadata {
    466 		userMeta = dst.UserMetadata
    467 	} else {
    468 		userMeta = srcObjectInfos[0].UserMetadata
    469 	}
    470 
    471 	var userTags map[string]string
    472 	if dst.ReplaceTags {
    473 		userTags = dst.UserTags
    474 	} else {
    475 		userTags = srcObjectInfos[0].UserTags
    476 	}
    477 
    478 	uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{
    479 		ServerSideEncryption: dst.Encryption,
    480 		UserMetadata:         userMeta,
    481 		UserTags:             userTags,
    482 		Mode:                 dst.Mode,
    483 		RetainUntilDate:      dst.RetainUntilDate,
    484 		LegalHold:            dst.LegalHold,
    485 	})
    486 	if err != nil {
    487 		return UploadInfo{}, err
    488 	}
    489 
    490 	// 3. Perform copy part uploads
    491 	objParts := []CompletePart{}
    492 	partIndex := 1
    493 	for i, src := range srcs {
    494 		h := make(http.Header)
    495 		src.Marshal(h)
    496 		if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC {
    497 			dst.Encryption.Marshal(h)
    498 		}
    499 
    500 		// calculate start/end indices of parts after
    501 		// splitting.
    502 		startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
    503 		for j, start := range startIdx {
    504 			end := endIdx[j]
    505 
    506 			// Add (or reset) source range header for
    507 			// upload part copy request.
    508 			h.Set("x-amz-copy-source-range",
    509 				fmt.Sprintf("bytes=%d-%d", start, end))
    510 
    511 			// make upload-part-copy request
    512 			complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
    513 				dst.Object, uploadID, partIndex, h)
    514 			if err != nil {
    515 				return UploadInfo{}, err
    516 			}
    517 			if dst.Progress != nil {
    518 				io.CopyN(io.Discard, dst.Progress, end-start+1)
    519 			}
    520 			objParts = append(objParts, complPart)
    521 			partIndex++
    522 		}
    523 	}
    524 
    525 	// 4. Make final complete-multipart request.
    526 	uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
    527 		completeMultipartUpload{Parts: objParts}, PutObjectOptions{ServerSideEncryption: dst.Encryption})
    528 	if err != nil {
    529 		return UploadInfo{}, err
    530 	}
    531 
    532 	uploadInfo.Size = totalSize
    533 	return uploadInfo, nil
    534 }
    535 
    536 // partsRequired is maximum parts possible with
    537 // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
    538 func partsRequired(size int64) int64 {
    539 	maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
    540 	r := size / int64(maxPartSize)
    541 	if size%int64(maxPartSize) > 0 {
    542 		r++
    543 	}
    544 	return r
    545 }
    546 
    547 // calculateEvenSplits - computes splits for a source and returns
    548 // start and end index slices. Splits happen evenly to be sure that no
    549 // part is less than 5MiB, as that could fail the multipart request if
    550 // it is not the last part.
    551 func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
    552 	if size == 0 {
    553 		return
    554 	}
    555 
    556 	reqParts := partsRequired(size)
    557 	startIndex = make([]int64, reqParts)
    558 	endIndex = make([]int64, reqParts)
    559 	// Compute number of required parts `k`, as:
    560 	//
    561 	// k = ceiling(size / copyPartSize)
    562 	//
    563 	// Now, distribute the `size` bytes in the source into
    564 	// k parts as evenly as possible:
    565 	//
    566 	// r parts sized (q+1) bytes, and
    567 	// (k - r) parts sized q bytes, where
    568 	//
    569 	// size = q * k + r (by simple division of size by k,
    570 	// so that 0 <= r < k)
    571 	//
    572 	start := src.Start
    573 	if start == -1 {
    574 		start = 0
    575 	}
    576 	quot, rem := size/reqParts, size%reqParts
    577 	nextStart := start
    578 	for j := int64(0); j < reqParts; j++ {
    579 		curPartSize := quot
    580 		if j < rem {
    581 			curPartSize++
    582 		}
    583 
    584 		cStart := nextStart
    585 		cEnd := cStart + curPartSize - 1
    586 		nextStart = cEnd + 1
    587 
    588 		startIndex[j], endIndex[j] = cStart, cEnd
    589 	}
    590 	return
    591 }