api-put-object-fan-out.go (4906B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2023 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "io" 25 "mime/multipart" 26 "net/http" 27 "strconv" 28 "strings" 29 "time" 30 31 "github.com/minio/minio-go/v7/pkg/encrypt" 32 ) 33 34 // PutObjectFanOutEntry is per object entry fan-out metadata 35 type PutObjectFanOutEntry struct { 36 Key string `json:"key"` 37 UserMetadata map[string]string `json:"metadata,omitempty"` 38 UserTags map[string]string `json:"tags,omitempty"` 39 ContentType string `json:"contentType,omitempty"` 40 ContentEncoding string `json:"contentEncoding,omitempty"` 41 ContentDisposition string `json:"contentDisposition,omitempty"` 42 ContentLanguage string `json:"contentLanguage,omitempty"` 43 CacheControl string `json:"cacheControl,omitempty"` 44 Retention RetentionMode `json:"retention,omitempty"` 45 RetainUntilDate *time.Time `json:"retainUntil,omitempty"` 46 } 47 48 // PutObjectFanOutRequest this is the request structure sent 49 // to the server to fan-out the stream to multiple objects. 50 type PutObjectFanOutRequest struct { 51 Entries []PutObjectFanOutEntry 52 Checksum Checksum 53 SSE encrypt.ServerSide 54 } 55 56 // PutObjectFanOutResponse this is the response structure sent 57 // by the server upon success or failure for each object 58 // fan-out keys. Additionally, this response carries ETag, 59 // VersionID and LastModified for each object fan-out. 60 type PutObjectFanOutResponse struct { 61 Key string `json:"key"` 62 ETag string `json:"etag,omitempty"` 63 VersionID string `json:"versionId,omitempty"` 64 LastModified *time.Time `json:"lastModified,omitempty"` 65 Error error `json:"error,omitempty"` 66 } 67 68 // PutObjectFanOut - is a variant of PutObject instead of writing a single object from a single 69 // stream multiple objects are written, defined via a list of PutObjectFanOutRequests. Each entry 70 // in PutObjectFanOutRequest carries an object keyname and its relevant metadata if any. `Key` is 71 // mandatory, rest of the other options in PutObjectFanOutRequest are optional. 72 func (c *Client) PutObjectFanOut(ctx context.Context, bucket string, fanOutData io.Reader, fanOutReq PutObjectFanOutRequest) ([]PutObjectFanOutResponse, error) { 73 if len(fanOutReq.Entries) == 0 { 74 return nil, errInvalidArgument("fan out requests cannot be empty") 75 } 76 77 policy := NewPostPolicy() 78 policy.SetBucket(bucket) 79 policy.SetKey(strconv.FormatInt(time.Now().UnixNano(), 16)) 80 81 // Expires in 15 minutes. 82 policy.SetExpires(time.Now().UTC().Add(15 * time.Minute)) 83 84 // Set encryption headers if any. 85 policy.SetEncryption(fanOutReq.SSE) 86 87 // Set checksum headers if any. 88 policy.SetChecksum(fanOutReq.Checksum) 89 90 url, formData, err := c.PresignedPostPolicy(ctx, policy) 91 if err != nil { 92 return nil, err 93 } 94 95 r, w := io.Pipe() 96 97 req, err := http.NewRequest(http.MethodPost, url.String(), r) 98 if err != nil { 99 w.Close() 100 return nil, err 101 } 102 103 var b strings.Builder 104 enc := json.NewEncoder(&b) 105 for _, req := range fanOutReq.Entries { 106 if req.Key == "" { 107 w.Close() 108 return nil, errors.New("PutObjectFanOutRequest.Key is mandatory and cannot be empty") 109 } 110 if err = enc.Encode(&req); err != nil { 111 w.Close() 112 return nil, err 113 } 114 } 115 116 mwriter := multipart.NewWriter(w) 117 req.Header.Add("Content-Type", mwriter.FormDataContentType()) 118 119 go func() { 120 defer w.Close() 121 defer mwriter.Close() 122 123 for k, v := range formData { 124 if err := mwriter.WriteField(k, v); err != nil { 125 return 126 } 127 } 128 129 if err := mwriter.WriteField("x-minio-fanout-list", b.String()); err != nil { 130 return 131 } 132 133 mw, err := mwriter.CreateFormFile("file", "fanout-content") 134 if err != nil { 135 return 136 } 137 138 if _, err = io.Copy(mw, fanOutData); err != nil { 139 return 140 } 141 }() 142 143 resp, err := c.do(req) 144 if err != nil { 145 return nil, err 146 } 147 defer closeResponse(resp) 148 149 if resp.StatusCode != http.StatusOK { 150 return nil, httpRespToErrorResponse(resp, bucket, "fanout-content") 151 } 152 153 dec := json.NewDecoder(resp.Body) 154 fanOutResp := make([]PutObjectFanOutResponse, 0, len(fanOutReq.Entries)) 155 for dec.More() { 156 var m PutObjectFanOutResponse 157 if err = dec.Decode(&m); err != nil { 158 return nil, err 159 } 160 fanOutResp = append(fanOutResp, m) 161 } 162 163 return fanOutResp, nil 164 }