
Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-putobject-snowball.go (5360B)

      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2021 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     18 package minio
     20 import (
     21 	"archive/tar"
     22 	"bufio"
     23 	"bytes"
     24 	"context"
     25 	"fmt"
     26 	"io"
     27 	"os"
     28 	"strings"
     29 	"sync"
     30 	"time"
     32 	""
     33 )
     35 // SnowballOptions contains options for PutObjectsSnowball calls.
     36 type SnowballOptions struct {
     37 	// Opts is options applied to all objects.
     38 	Opts PutObjectOptions
     40 	// Processing options:
     42 	// InMemory specifies that all objects should be collected in memory
     43 	// before they are uploaded.
     44 	// If false a temporary file will be created.
     45 	InMemory bool
     47 	// Compress enabled content compression before upload.
     48 	// Compression will typically reduce memory and network usage,
     49 	// Compression can safely be enabled with MinIO hosts.
     50 	Compress bool
     51 }
     53 // SnowballObject contains information about a single object to be added to the snowball.
     54 type SnowballObject struct {
     55 	// Key is the destination key, including prefix.
     56 	Key string
     58 	// Size is the content size of this object.
     59 	Size int64
     61 	// Modtime to apply to the object.
     62 	// If Modtime is the zero value current time will be used.
     63 	ModTime time.Time
     65 	// Content of the object.
     66 	// Exactly 'Size' number of bytes must be provided.
     67 	Content io.Reader
     69 	// Close will be called when an object has finished processing.
     70 	// Note that if PutObjectsSnowball returns because of an error,
     71 	// objects not consumed from the input will NOT have been closed.
     72 	// Leave as nil for no callback.
     73 	Close func()
     74 }
     76 type nopReadSeekCloser struct {
     77 	io.ReadSeeker
     78 }
     80 func (n nopReadSeekCloser) Close() error {
     81 	return nil
     82 }
     84 // This is available as io.ReadSeekCloser from go1.16
     85 type readSeekCloser interface {
     86 	io.Reader
     87 	io.Closer
     88 	io.Seeker
     89 }
     91 // PutObjectsSnowball will put multiple objects with a single put call.
     92 // A (compressed) TAR file will be created which will contain multiple objects.
     93 // The key for each object will be used for the destination in the specified bucket.
     94 // Total size should be < 5TB.
     95 // This function blocks until 'objs' is closed and the content has been uploaded.
     96 func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) {
     97 	err = opts.Opts.validate()
     98 	if err != nil {
     99 		return err
    100 	}
    101 	var tmpWriter io.Writer
    102 	var getTmpReader func() (rc readSeekCloser, sz int64, err error)
    103 	if opts.InMemory {
    104 		b := bytes.NewBuffer(nil)
    105 		tmpWriter = b
    106 		getTmpReader = func() (readSeekCloser, int64, error) {
    107 			return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil
    108 		}
    109 	} else {
    110 		f, err := os.CreateTemp("", "s3-putsnowballobjects-*")
    111 		if err != nil {
    112 			return err
    113 		}
    114 		name := f.Name()
    115 		tmpWriter = f
    116 		var once sync.Once
    117 		defer once.Do(func() {
    118 			f.Close()
    119 		})
    120 		defer os.Remove(name)
    121 		getTmpReader = func() (readSeekCloser, int64, error) {
    122 			once.Do(func() {
    123 				f.Close()
    124 			})
    125 			f, err := os.Open(name)
    126 			if err != nil {
    127 				return nil, 0, err
    128 			}
    129 			st, err := f.Stat()
    130 			if err != nil {
    131 				return nil, 0, err
    132 			}
    133 			return f, st.Size(), nil
    134 		}
    135 	}
    136 	flush := func() error { return nil }
    137 	if !opts.Compress {
    138 		if !opts.InMemory {
    139 			// Insert buffer for writes.
    140 			buf := bufio.NewWriterSize(tmpWriter, 1<<20)
    141 			flush = buf.Flush
    142 			tmpWriter = buf
    143 		}
    144 	} else {
    145 		s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression())
    146 		flush = s2c.Close
    147 		defer s2c.Close()
    148 		tmpWriter = s2c
    149 	}
    150 	t := tar.NewWriter(tmpWriter)
    152 objectLoop:
    153 	for {
    154 		select {
    155 		case <-ctx.Done():
    156 			return ctx.Err()
    157 		case obj, ok := <-objs:
    158 			if !ok {
    159 				break objectLoop
    160 			}
    162 			closeObj := func() {}
    163 			if obj.Close != nil {
    164 				closeObj = obj.Close
    165 			}
    167 			// Trim accidental slash prefix.
    168 			obj.Key = strings.TrimPrefix(obj.Key, "/")
    169 			header := tar.Header{
    170 				Typeflag: tar.TypeReg,
    171 				Name:     obj.Key,
    172 				Size:     obj.Size,
    173 				ModTime:  obj.ModTime,
    174 				Format:   tar.FormatPAX,
    175 			}
    176 			if header.ModTime.IsZero() {
    177 				header.ModTime = time.Now().UTC()
    178 			}
    180 			if err := t.WriteHeader(&header); err != nil {
    181 				closeObj()
    182 				return err
    183 			}
    184 			n, err := io.Copy(t, obj.Content)
    185 			if err != nil {
    186 				closeObj()
    187 				return err
    188 			}
    189 			if n != obj.Size {
    190 				closeObj()
    191 				return io.ErrUnexpectedEOF
    192 			}
    193 			closeObj()
    194 		}
    195 	}
    196 	// Flush tar
    197 	err = t.Flush()
    198 	if err != nil {
    199 		return err
    200 	}
    201 	// Flush compression
    202 	err = flush()
    203 	if err != nil {
    204 		return err
    205 	}
    206 	if opts.Opts.UserMetadata == nil {
    207 		opts.Opts.UserMetadata = map[string]string{}
    208 	}
    209 	opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true"
    210 	opts.Opts.DisableMultipart = true
    211 	rc, sz, err := getTmpReader()
    212 	if err != nil {
    213 		return err
    214 	}
    215 	defer rc.Close()
    216 	rand := c.random.Uint64()
    217 	_, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts)
    218 	return err
    219 }