gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-put-object-fan-out.go (4906B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2023 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"context"
     22 	"encoding/json"
     23 	"errors"
     24 	"io"
     25 	"mime/multipart"
     26 	"net/http"
     27 	"strconv"
     28 	"strings"
     29 	"time"
     30 
     31 	"github.com/minio/minio-go/v7/pkg/encrypt"
     32 )
     33 
     34 // PutObjectFanOutEntry is per object entry fan-out metadata
     35 type PutObjectFanOutEntry struct {
     36 	Key                string            `json:"key"`
     37 	UserMetadata       map[string]string `json:"metadata,omitempty"`
     38 	UserTags           map[string]string `json:"tags,omitempty"`
     39 	ContentType        string            `json:"contentType,omitempty"`
     40 	ContentEncoding    string            `json:"contentEncoding,omitempty"`
     41 	ContentDisposition string            `json:"contentDisposition,omitempty"`
     42 	ContentLanguage    string            `json:"contentLanguage,omitempty"`
     43 	CacheControl       string            `json:"cacheControl,omitempty"`
     44 	Retention          RetentionMode     `json:"retention,omitempty"`
     45 	RetainUntilDate    *time.Time        `json:"retainUntil,omitempty"`
     46 }
     47 
     48 // PutObjectFanOutRequest this is the request structure sent
     49 // to the server to fan-out the stream to multiple objects.
     50 type PutObjectFanOutRequest struct {
     51 	Entries  []PutObjectFanOutEntry
     52 	Checksum Checksum
     53 	SSE      encrypt.ServerSide
     54 }
     55 
     56 // PutObjectFanOutResponse this is the response structure sent
     57 // by the server upon success or failure for each object
     58 // fan-out keys. Additionally, this response carries ETag,
     59 // VersionID and LastModified for each object fan-out.
     60 type PutObjectFanOutResponse struct {
     61 	Key          string     `json:"key"`
     62 	ETag         string     `json:"etag,omitempty"`
     63 	VersionID    string     `json:"versionId,omitempty"`
     64 	LastModified *time.Time `json:"lastModified,omitempty"`
     65 	Error        error      `json:"error,omitempty"`
     66 }
     67 
     68 // PutObjectFanOut - is a variant of PutObject instead of writing a single object from a single
     69 // stream multiple objects are written, defined via a list of PutObjectFanOutRequests. Each entry
     70 // in PutObjectFanOutRequest carries an object keyname and its relevant metadata if any. `Key` is
     71 // mandatory, rest of the other options in PutObjectFanOutRequest are optional.
     72 func (c *Client) PutObjectFanOut(ctx context.Context, bucket string, fanOutData io.Reader, fanOutReq PutObjectFanOutRequest) ([]PutObjectFanOutResponse, error) {
     73 	if len(fanOutReq.Entries) == 0 {
     74 		return nil, errInvalidArgument("fan out requests cannot be empty")
     75 	}
     76 
     77 	policy := NewPostPolicy()
     78 	policy.SetBucket(bucket)
     79 	policy.SetKey(strconv.FormatInt(time.Now().UnixNano(), 16))
     80 
     81 	// Expires in 15 minutes.
     82 	policy.SetExpires(time.Now().UTC().Add(15 * time.Minute))
     83 
     84 	// Set encryption headers if any.
     85 	policy.SetEncryption(fanOutReq.SSE)
     86 
     87 	// Set checksum headers if any.
     88 	policy.SetChecksum(fanOutReq.Checksum)
     89 
     90 	url, formData, err := c.PresignedPostPolicy(ctx, policy)
     91 	if err != nil {
     92 		return nil, err
     93 	}
     94 
     95 	r, w := io.Pipe()
     96 
     97 	req, err := http.NewRequest(http.MethodPost, url.String(), r)
     98 	if err != nil {
     99 		w.Close()
    100 		return nil, err
    101 	}
    102 
    103 	var b strings.Builder
    104 	enc := json.NewEncoder(&b)
    105 	for _, req := range fanOutReq.Entries {
    106 		if req.Key == "" {
    107 			w.Close()
    108 			return nil, errors.New("PutObjectFanOutRequest.Key is mandatory and cannot be empty")
    109 		}
    110 		if err = enc.Encode(&req); err != nil {
    111 			w.Close()
    112 			return nil, err
    113 		}
    114 	}
    115 
    116 	mwriter := multipart.NewWriter(w)
    117 	req.Header.Add("Content-Type", mwriter.FormDataContentType())
    118 
    119 	go func() {
    120 		defer w.Close()
    121 		defer mwriter.Close()
    122 
    123 		for k, v := range formData {
    124 			if err := mwriter.WriteField(k, v); err != nil {
    125 				return
    126 			}
    127 		}
    128 
    129 		if err := mwriter.WriteField("x-minio-fanout-list", b.String()); err != nil {
    130 			return
    131 		}
    132 
    133 		mw, err := mwriter.CreateFormFile("file", "fanout-content")
    134 		if err != nil {
    135 			return
    136 		}
    137 
    138 		if _, err = io.Copy(mw, fanOutData); err != nil {
    139 			return
    140 		}
    141 	}()
    142 
    143 	resp, err := c.do(req)
    144 	if err != nil {
    145 		return nil, err
    146 	}
    147 	defer closeResponse(resp)
    148 
    149 	if resp.StatusCode != http.StatusOK {
    150 		return nil, httpRespToErrorResponse(resp, bucket, "fanout-content")
    151 	}
    152 
    153 	dec := json.NewDecoder(resp.Body)
    154 	fanOutResp := make([]PutObjectFanOutResponse, 0, len(fanOutReq.Entries))
    155 	for dec.More() {
    156 		var m PutObjectFanOutResponse
    157 		if err = dec.Decode(&m); err != nil {
    158 			return nil, err
    159 		}
    160 		fanOutResp = append(fanOutResp, m)
    161 	}
    162 
    163 	return fanOutResp, nil
    164 }