gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-bucket-replication.go (8786B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2020 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"encoding/json"
     24 	"encoding/xml"
     25 	"io"
     26 	"net/http"
     27 	"net/url"
     28 	"time"
     29 
     30 	"github.com/google/uuid"
     31 	"github.com/minio/minio-go/v7/pkg/replication"
     32 	"github.com/minio/minio-go/v7/pkg/s3utils"
     33 )
     34 
     35 // RemoveBucketReplication removes a replication config on an existing bucket.
     36 func (c *Client) RemoveBucketReplication(ctx context.Context, bucketName string) error {
     37 	return c.removeBucketReplication(ctx, bucketName)
     38 }
     39 
     40 // SetBucketReplication sets a replication config on an existing bucket.
     41 func (c *Client) SetBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
     42 	// Input validation.
     43 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
     44 		return err
     45 	}
     46 
     47 	// If replication is empty then delete it.
     48 	if cfg.Empty() {
     49 		return c.removeBucketReplication(ctx, bucketName)
     50 	}
     51 	// Save the updated replication.
     52 	return c.putBucketReplication(ctx, bucketName, cfg)
     53 }
     54 
     55 // Saves a new bucket replication.
     56 func (c *Client) putBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
     57 	// Get resources properly escaped and lined up before
     58 	// using them in http request.
     59 	urlValues := make(url.Values)
     60 	urlValues.Set("replication", "")
     61 	replication, err := xml.Marshal(cfg)
     62 	if err != nil {
     63 		return err
     64 	}
     65 
     66 	reqMetadata := requestMetadata{
     67 		bucketName:       bucketName,
     68 		queryValues:      urlValues,
     69 		contentBody:      bytes.NewReader(replication),
     70 		contentLength:    int64(len(replication)),
     71 		contentMD5Base64: sumMD5Base64(replication),
     72 	}
     73 
     74 	// Execute PUT to upload a new bucket replication config.
     75 	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
     76 	defer closeResponse(resp)
     77 	if err != nil {
     78 		return err
     79 	}
     80 
     81 	if resp.StatusCode != http.StatusOK {
     82 		return httpRespToErrorResponse(resp, bucketName, "")
     83 	}
     84 
     85 	return nil
     86 }
     87 
     88 // Remove replication from a bucket.
     89 func (c *Client) removeBucketReplication(ctx context.Context, bucketName string) error {
     90 	// Get resources properly escaped and lined up before
     91 	// using them in http request.
     92 	urlValues := make(url.Values)
     93 	urlValues.Set("replication", "")
     94 
     95 	// Execute DELETE on objectName.
     96 	resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
     97 		bucketName:       bucketName,
     98 		queryValues:      urlValues,
     99 		contentSHA256Hex: emptySHA256Hex,
    100 	})
    101 	defer closeResponse(resp)
    102 	if err != nil {
    103 		return err
    104 	}
    105 	if resp.StatusCode != http.StatusOK {
    106 		return httpRespToErrorResponse(resp, bucketName, "")
    107 	}
    108 	return nil
    109 }
    110 
    111 // GetBucketReplication fetches bucket replication configuration.If config is not
    112 // found, returns empty config with nil error.
    113 func (c *Client) GetBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
    114 	// Input validation.
    115 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    116 		return cfg, err
    117 	}
    118 	bucketReplicationCfg, err := c.getBucketReplication(ctx, bucketName)
    119 	if err != nil {
    120 		errResponse := ToErrorResponse(err)
    121 		if errResponse.Code == "ReplicationConfigurationNotFoundError" {
    122 			return cfg, nil
    123 		}
    124 		return cfg, err
    125 	}
    126 	return bucketReplicationCfg, nil
    127 }
    128 
    129 // Request server for current bucket replication config.
    130 func (c *Client) getBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
    131 	// Get resources properly escaped and lined up before
    132 	// using them in http request.
    133 	urlValues := make(url.Values)
    134 	urlValues.Set("replication", "")
    135 
    136 	// Execute GET on bucket to get replication config.
    137 	resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
    138 		bucketName:  bucketName,
    139 		queryValues: urlValues,
    140 	})
    141 
    142 	defer closeResponse(resp)
    143 	if err != nil {
    144 		return cfg, err
    145 	}
    146 
    147 	if resp.StatusCode != http.StatusOK {
    148 		return cfg, httpRespToErrorResponse(resp, bucketName, "")
    149 	}
    150 
    151 	if err = xmlDecoder(resp.Body, &cfg); err != nil {
    152 		return cfg, err
    153 	}
    154 
    155 	return cfg, nil
    156 }
    157 
    158 // GetBucketReplicationMetrics fetches bucket replication status metrics
    159 func (c *Client) GetBucketReplicationMetrics(ctx context.Context, bucketName string) (s replication.Metrics, err error) {
    160 	// Input validation.
    161 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    162 		return s, err
    163 	}
    164 	// Get resources properly escaped and lined up before
    165 	// using them in http request.
    166 	urlValues := make(url.Values)
    167 	urlValues.Set("replication-metrics", "")
    168 
    169 	// Execute GET on bucket to get replication config.
    170 	resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
    171 		bucketName:  bucketName,
    172 		queryValues: urlValues,
    173 	})
    174 
    175 	defer closeResponse(resp)
    176 	if err != nil {
    177 		return s, err
    178 	}
    179 
    180 	if resp.StatusCode != http.StatusOK {
    181 		return s, httpRespToErrorResponse(resp, bucketName, "")
    182 	}
    183 	respBytes, err := io.ReadAll(resp.Body)
    184 	if err != nil {
    185 		return s, err
    186 	}
    187 
    188 	if err := json.Unmarshal(respBytes, &s); err != nil {
    189 		return s, err
    190 	}
    191 	return s, nil
    192 }
    193 
    194 // mustGetUUID - get a random UUID.
    195 func mustGetUUID() string {
    196 	u, err := uuid.NewRandom()
    197 	if err != nil {
    198 		return ""
    199 	}
    200 	return u.String()
    201 }
    202 
    203 // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
    204 // is enabled in the replication config
    205 func (c *Client) ResetBucketReplication(ctx context.Context, bucketName string, olderThan time.Duration) (rID string, err error) {
    206 	rID = mustGetUUID()
    207 	_, err = c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, "", rID)
    208 	if err != nil {
    209 		return rID, err
    210 	}
    211 	return rID, nil
    212 }
    213 
    214 // ResetBucketReplicationOnTarget kicks off replication of previously replicated objects if
    215 // ExistingObjectReplication is enabled in the replication config
    216 func (c *Client) ResetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string) (replication.ResyncTargetsInfo, error) {
    217 	return c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, tgtArn, mustGetUUID())
    218 }
    219 
    220 // ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
    221 // is enabled in the replication config
    222 func (c *Client) resetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string, resetID string) (rinfo replication.ResyncTargetsInfo, err error) {
    223 	// Input validation.
    224 	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
    225 		return
    226 	}
    227 	// Get resources properly escaped and lined up before
    228 	// using them in http request.
    229 	urlValues := make(url.Values)
    230 	urlValues.Set("replication-reset", "")
    231 	if olderThan > 0 {
    232 		urlValues.Set("older-than", olderThan.String())
    233 	}
    234 	if tgtArn != "" {
    235 		urlValues.Set("arn", tgtArn)
    236 	}
    237 	urlValues.Set("reset-id", resetID)
    238 	// Execute GET on bucket to get replication config.
    239 	resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
    240 		bucketName:  bucketName,
    241 		queryValues: urlValues,
    242 	})
    243 
    244 	defer closeResponse(resp)
    245 	if err != nil {
    246 		return rinfo, err
    247 	}
    248 
    249 	if resp.StatusCode != http.StatusOK {
    250 		return rinfo, httpRespToErrorResponse(resp, bucketName, "")
    251 	}
    252 
    253 	if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil {
    254 		return rinfo, err
    255 	}
    256 	return rinfo, nil
    257 }
    258 
    259 // GetBucketReplicationResyncStatus gets the status of replication resync
    260 func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketName, arn string) (rinfo replication.ResyncTargetsInfo, err error) {
    261 	// Input validation.
    262 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    263 		return rinfo, err
    264 	}
    265 	// Get resources properly escaped and lined up before
    266 	// using them in http request.
    267 	urlValues := make(url.Values)
    268 	urlValues.Set("replication-reset-status", "")
    269 	if arn != "" {
    270 		urlValues.Set("arn", arn)
    271 	}
    272 	// Execute GET on bucket to get replication config.
    273 	resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
    274 		bucketName:  bucketName,
    275 		queryValues: urlValues,
    276 	})
    277 
    278 	defer closeResponse(resp)
    279 	if err != nil {
    280 		return rinfo, err
    281 	}
    282 
    283 	if resp.StatusCode != http.StatusOK {
    284 		return rinfo, httpRespToErrorResponse(resp, bucketName, "")
    285 	}
    286 
    287 	if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil {
    288 		return rinfo, err
    289 	}
    290 	return rinfo, nil
    291 }