gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-bucket-notification.go (8077B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * Copyright 2017-2020 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"bufio"
     22 	"bytes"
     23 	"context"
     24 	"encoding/xml"
     25 	"net/http"
     26 	"net/url"
     27 	"time"
     28 
     29 	jsoniter "github.com/json-iterator/go"
     30 	"github.com/minio/minio-go/v7/pkg/notification"
     31 	"github.com/minio/minio-go/v7/pkg/s3utils"
     32 )
     33 
     34 // SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts.
     35 func (c *Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error {
     36 	// Input validation.
     37 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
     38 		return err
     39 	}
     40 
     41 	// Get resources properly escaped and lined up before
     42 	// using them in http request.
     43 	urlValues := make(url.Values)
     44 	urlValues.Set("notification", "")
     45 
     46 	notifBytes, err := xml.Marshal(&config)
     47 	if err != nil {
     48 		return err
     49 	}
     50 
     51 	notifBuffer := bytes.NewReader(notifBytes)
     52 	reqMetadata := requestMetadata{
     53 		bucketName:       bucketName,
     54 		queryValues:      urlValues,
     55 		contentBody:      notifBuffer,
     56 		contentLength:    int64(len(notifBytes)),
     57 		contentMD5Base64: sumMD5Base64(notifBytes),
     58 		contentSHA256Hex: sum256Hex(notifBytes),
     59 	}
     60 
     61 	// Execute PUT to upload a new bucket notification.
     62 	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
     63 	defer closeResponse(resp)
     64 	if err != nil {
     65 		return err
     66 	}
     67 	if resp != nil {
     68 		if resp.StatusCode != http.StatusOK {
     69 			return httpRespToErrorResponse(resp, bucketName, "")
     70 		}
     71 	}
     72 	return nil
     73 }
     74 
     75 // RemoveAllBucketNotification - Remove bucket notification clears all previously specified config
     76 func (c *Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error {
     77 	return c.SetBucketNotification(ctx, bucketName, notification.Configuration{})
     78 }
     79 
     80 // GetBucketNotification returns current bucket notification configuration
     81 func (c *Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) {
     82 	// Input validation.
     83 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
     84 		return notification.Configuration{}, err
     85 	}
     86 	return c.getBucketNotification(ctx, bucketName)
     87 }
     88 
     89 // Request server for notification rules.
     90 func (c *Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) {
     91 	urlValues := make(url.Values)
     92 	urlValues.Set("notification", "")
     93 
     94 	// Execute GET on bucket to list objects.
     95 	resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
     96 		bucketName:       bucketName,
     97 		queryValues:      urlValues,
     98 		contentSHA256Hex: emptySHA256Hex,
     99 	})
    100 
    101 	defer closeResponse(resp)
    102 	if err != nil {
    103 		return notification.Configuration{}, err
    104 	}
    105 	return processBucketNotificationResponse(bucketName, resp)
    106 }
    107 
    108 // processes the GetNotification http response from the server.
    109 func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) {
    110 	if resp.StatusCode != http.StatusOK {
    111 		errResponse := httpRespToErrorResponse(resp, bucketName, "")
    112 		return notification.Configuration{}, errResponse
    113 	}
    114 	var bucketNotification notification.Configuration
    115 	err := xmlDecoder(resp.Body, &bucketNotification)
    116 	if err != nil {
    117 		return notification.Configuration{}, err
    118 	}
    119 	return bucketNotification, nil
    120 }
    121 
    122 // ListenNotification listen for all events, this is a MinIO specific API
    123 func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info {
    124 	return c.ListenBucketNotification(ctx, "", prefix, suffix, events)
    125 }
    126 
    127 // ListenBucketNotification listen for bucket events, this is a MinIO specific API
    128 func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info {
    129 	notificationInfoCh := make(chan notification.Info, 1)
    130 	const notificationCapacity = 4 * 1024 * 1024
    131 	notificationEventBuffer := make([]byte, notificationCapacity)
    132 	// Only success, start a routine to start reading line by line.
    133 	go func(notificationInfoCh chan<- notification.Info) {
    134 		defer close(notificationInfoCh)
    135 
    136 		// Validate the bucket name.
    137 		if bucketName != "" {
    138 			if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    139 				select {
    140 				case notificationInfoCh <- notification.Info{
    141 					Err: err,
    142 				}:
    143 				case <-ctx.Done():
    144 				}
    145 				return
    146 			}
    147 		}
    148 
    149 		// Check ARN partition to verify if listening bucket is supported
    150 		if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) {
    151 			select {
    152 			case notificationInfoCh <- notification.Info{
    153 				Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"),
    154 			}:
    155 			case <-ctx.Done():
    156 			}
    157 			return
    158 		}
    159 
    160 		// Continuously run and listen on bucket notification.
    161 		// Create a done channel to control 'ListObjects' go routine.
    162 		retryDoneCh := make(chan struct{}, 1)
    163 
    164 		// Indicate to our routine to exit cleanly upon return.
    165 		defer close(retryDoneCh)
    166 
    167 		// Prepare urlValues to pass into the request on every loop
    168 		urlValues := make(url.Values)
    169 		urlValues.Set("ping", "10")
    170 		urlValues.Set("prefix", prefix)
    171 		urlValues.Set("suffix", suffix)
    172 		urlValues["events"] = events
    173 
    174 		// Wait on the jitter retry loop.
    175 		for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) {
    176 			// Execute GET on bucket to list objects.
    177 			resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
    178 				bucketName:       bucketName,
    179 				queryValues:      urlValues,
    180 				contentSHA256Hex: emptySHA256Hex,
    181 			})
    182 			if err != nil {
    183 				select {
    184 				case notificationInfoCh <- notification.Info{
    185 					Err: err,
    186 				}:
    187 				case <-ctx.Done():
    188 				}
    189 				return
    190 			}
    191 
    192 			// Validate http response, upon error return quickly.
    193 			if resp.StatusCode != http.StatusOK {
    194 				errResponse := httpRespToErrorResponse(resp, bucketName, "")
    195 				select {
    196 				case notificationInfoCh <- notification.Info{
    197 					Err: errResponse,
    198 				}:
    199 				case <-ctx.Done():
    200 				}
    201 				return
    202 			}
    203 
    204 			// Initialize a new bufio scanner, to read line by line.
    205 			bio := bufio.NewScanner(resp.Body)
    206 
    207 			// Use a higher buffer to support unexpected
    208 			// caching done by proxies
    209 			bio.Buffer(notificationEventBuffer, notificationCapacity)
    210 			json := jsoniter.ConfigCompatibleWithStandardLibrary
    211 
    212 			// Unmarshal each line, returns marshaled values.
    213 			for bio.Scan() {
    214 				var notificationInfo notification.Info
    215 				if err = json.Unmarshal(bio.Bytes(), &notificationInfo); err != nil {
    216 					// Unexpected error during json unmarshal, send
    217 					// the error to caller for actionable as needed.
    218 					select {
    219 					case notificationInfoCh <- notification.Info{
    220 						Err: err,
    221 					}:
    222 					case <-ctx.Done():
    223 						return
    224 					}
    225 					closeResponse(resp)
    226 					continue
    227 				}
    228 
    229 				// Empty events pinged from the server
    230 				if len(notificationInfo.Records) == 0 && notificationInfo.Err == nil {
    231 					continue
    232 				}
    233 
    234 				// Send notificationInfo
    235 				select {
    236 				case notificationInfoCh <- notificationInfo:
    237 				case <-ctx.Done():
    238 					closeResponse(resp)
    239 					return
    240 				}
    241 			}
    242 
    243 			if err = bio.Err(); err != nil {
    244 				select {
    245 				case notificationInfoCh <- notification.Info{
    246 					Err: err,
    247 				}:
    248 				case <-ctx.Done():
    249 					return
    250 				}
    251 			}
    252 
    253 			// Close current connection before looping further.
    254 			closeResponse(resp)
    255 
    256 		}
    257 	}(notificationInfoCh)
    258 
    259 	// Returns the notification info channel, for caller to start reading from.
    260 	return notificationInfoCh
    261 }