api-bucket-replication.go (8786B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2020 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bytes" 22 "context" 23 "encoding/json" 24 "encoding/xml" 25 "io" 26 "net/http" 27 "net/url" 28 "time" 29 30 "github.com/google/uuid" 31 "github.com/minio/minio-go/v7/pkg/replication" 32 "github.com/minio/minio-go/v7/pkg/s3utils" 33 ) 34 35 // RemoveBucketReplication removes a replication config on an existing bucket. 36 func (c *Client) RemoveBucketReplication(ctx context.Context, bucketName string) error { 37 return c.removeBucketReplication(ctx, bucketName) 38 } 39 40 // SetBucketReplication sets a replication config on an existing bucket. 41 func (c *Client) SetBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error { 42 // Input validation. 43 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 44 return err 45 } 46 47 // If replication is empty then delete it. 48 if cfg.Empty() { 49 return c.removeBucketReplication(ctx, bucketName) 50 } 51 // Save the updated replication. 52 return c.putBucketReplication(ctx, bucketName, cfg) 53 } 54 55 // Saves a new bucket replication. 56 func (c *Client) putBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error { 57 // Get resources properly escaped and lined up before 58 // using them in http request. 59 urlValues := make(url.Values) 60 urlValues.Set("replication", "") 61 replication, err := xml.Marshal(cfg) 62 if err != nil { 63 return err 64 } 65 66 reqMetadata := requestMetadata{ 67 bucketName: bucketName, 68 queryValues: urlValues, 69 contentBody: bytes.NewReader(replication), 70 contentLength: int64(len(replication)), 71 contentMD5Base64: sumMD5Base64(replication), 72 } 73 74 // Execute PUT to upload a new bucket replication config. 75 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 76 defer closeResponse(resp) 77 if err != nil { 78 return err 79 } 80 81 if resp.StatusCode != http.StatusOK { 82 return httpRespToErrorResponse(resp, bucketName, "") 83 } 84 85 return nil 86 } 87 88 // Remove replication from a bucket. 89 func (c *Client) removeBucketReplication(ctx context.Context, bucketName string) error { 90 // Get resources properly escaped and lined up before 91 // using them in http request. 92 urlValues := make(url.Values) 93 urlValues.Set("replication", "") 94 95 // Execute DELETE on objectName. 96 resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{ 97 bucketName: bucketName, 98 queryValues: urlValues, 99 contentSHA256Hex: emptySHA256Hex, 100 }) 101 defer closeResponse(resp) 102 if err != nil { 103 return err 104 } 105 if resp.StatusCode != http.StatusOK { 106 return httpRespToErrorResponse(resp, bucketName, "") 107 } 108 return nil 109 } 110 111 // GetBucketReplication fetches bucket replication configuration.If config is not 112 // found, returns empty config with nil error. 113 func (c *Client) GetBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) { 114 // Input validation. 115 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 116 return cfg, err 117 } 118 bucketReplicationCfg, err := c.getBucketReplication(ctx, bucketName) 119 if err != nil { 120 errResponse := ToErrorResponse(err) 121 if errResponse.Code == "ReplicationConfigurationNotFoundError" { 122 return cfg, nil 123 } 124 return cfg, err 125 } 126 return bucketReplicationCfg, nil 127 } 128 129 // Request server for current bucket replication config. 130 func (c *Client) getBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) { 131 // Get resources properly escaped and lined up before 132 // using them in http request. 133 urlValues := make(url.Values) 134 urlValues.Set("replication", "") 135 136 // Execute GET on bucket to get replication config. 137 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 138 bucketName: bucketName, 139 queryValues: urlValues, 140 }) 141 142 defer closeResponse(resp) 143 if err != nil { 144 return cfg, err 145 } 146 147 if resp.StatusCode != http.StatusOK { 148 return cfg, httpRespToErrorResponse(resp, bucketName, "") 149 } 150 151 if err = xmlDecoder(resp.Body, &cfg); err != nil { 152 return cfg, err 153 } 154 155 return cfg, nil 156 } 157 158 // GetBucketReplicationMetrics fetches bucket replication status metrics 159 func (c *Client) GetBucketReplicationMetrics(ctx context.Context, bucketName string) (s replication.Metrics, err error) { 160 // Input validation. 161 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 162 return s, err 163 } 164 // Get resources properly escaped and lined up before 165 // using them in http request. 166 urlValues := make(url.Values) 167 urlValues.Set("replication-metrics", "") 168 169 // Execute GET on bucket to get replication config. 170 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 171 bucketName: bucketName, 172 queryValues: urlValues, 173 }) 174 175 defer closeResponse(resp) 176 if err != nil { 177 return s, err 178 } 179 180 if resp.StatusCode != http.StatusOK { 181 return s, httpRespToErrorResponse(resp, bucketName, "") 182 } 183 respBytes, err := io.ReadAll(resp.Body) 184 if err != nil { 185 return s, err 186 } 187 188 if err := json.Unmarshal(respBytes, &s); err != nil { 189 return s, err 190 } 191 return s, nil 192 } 193 194 // mustGetUUID - get a random UUID. 195 func mustGetUUID() string { 196 u, err := uuid.NewRandom() 197 if err != nil { 198 return "" 199 } 200 return u.String() 201 } 202 203 // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication 204 // is enabled in the replication config 205 func (c *Client) ResetBucketReplication(ctx context.Context, bucketName string, olderThan time.Duration) (rID string, err error) { 206 rID = mustGetUUID() 207 _, err = c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, "", rID) 208 if err != nil { 209 return rID, err 210 } 211 return rID, nil 212 } 213 214 // ResetBucketReplicationOnTarget kicks off replication of previously replicated objects if 215 // ExistingObjectReplication is enabled in the replication config 216 func (c *Client) ResetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string) (replication.ResyncTargetsInfo, error) { 217 return c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, tgtArn, mustGetUUID()) 218 } 219 220 // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication 221 // is enabled in the replication config 222 func (c *Client) resetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string, resetID string) (rinfo replication.ResyncTargetsInfo, err error) { 223 // Input validation. 224 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 225 return 226 } 227 // Get resources properly escaped and lined up before 228 // using them in http request. 229 urlValues := make(url.Values) 230 urlValues.Set("replication-reset", "") 231 if olderThan > 0 { 232 urlValues.Set("older-than", olderThan.String()) 233 } 234 if tgtArn != "" { 235 urlValues.Set("arn", tgtArn) 236 } 237 urlValues.Set("reset-id", resetID) 238 // Execute GET on bucket to get replication config. 239 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{ 240 bucketName: bucketName, 241 queryValues: urlValues, 242 }) 243 244 defer closeResponse(resp) 245 if err != nil { 246 return rinfo, err 247 } 248 249 if resp.StatusCode != http.StatusOK { 250 return rinfo, httpRespToErrorResponse(resp, bucketName, "") 251 } 252 253 if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil { 254 return rinfo, err 255 } 256 return rinfo, nil 257 } 258 259 // GetBucketReplicationResyncStatus gets the status of replication resync 260 func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketName, arn string) (rinfo replication.ResyncTargetsInfo, err error) { 261 // Input validation. 262 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 263 return rinfo, err 264 } 265 // Get resources properly escaped and lined up before 266 // using them in http request. 267 urlValues := make(url.Values) 268 urlValues.Set("replication-reset-status", "") 269 if arn != "" { 270 urlValues.Set("arn", arn) 271 } 272 // Execute GET on bucket to get replication config. 273 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 274 bucketName: bucketName, 275 queryValues: urlValues, 276 }) 277 278 defer closeResponse(resp) 279 if err != nil { 280 return rinfo, err 281 } 282 283 if resp.StatusCode != http.StatusOK { 284 return rinfo, httpRespToErrorResponse(resp, bucketName, "") 285 } 286 287 if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil { 288 return rinfo, err 289 } 290 return rinfo, nil 291 }