api-bucket-notification.go (8077B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017-2020 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bufio" 22 "bytes" 23 "context" 24 "encoding/xml" 25 "net/http" 26 "net/url" 27 "time" 28 29 jsoniter "github.com/json-iterator/go" 30 "github.com/minio/minio-go/v7/pkg/notification" 31 "github.com/minio/minio-go/v7/pkg/s3utils" 32 ) 33 34 // SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts. 35 func (c *Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error { 36 // Input validation. 37 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 38 return err 39 } 40 41 // Get resources properly escaped and lined up before 42 // using them in http request. 43 urlValues := make(url.Values) 44 urlValues.Set("notification", "") 45 46 notifBytes, err := xml.Marshal(&config) 47 if err != nil { 48 return err 49 } 50 51 notifBuffer := bytes.NewReader(notifBytes) 52 reqMetadata := requestMetadata{ 53 bucketName: bucketName, 54 queryValues: urlValues, 55 contentBody: notifBuffer, 56 contentLength: int64(len(notifBytes)), 57 contentMD5Base64: sumMD5Base64(notifBytes), 58 contentSHA256Hex: sum256Hex(notifBytes), 59 } 60 61 // Execute PUT to upload a new bucket notification. 62 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 63 defer closeResponse(resp) 64 if err != nil { 65 return err 66 } 67 if resp != nil { 68 if resp.StatusCode != http.StatusOK { 69 return httpRespToErrorResponse(resp, bucketName, "") 70 } 71 } 72 return nil 73 } 74 75 // RemoveAllBucketNotification - Remove bucket notification clears all previously specified config 76 func (c *Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error { 77 return c.SetBucketNotification(ctx, bucketName, notification.Configuration{}) 78 } 79 80 // GetBucketNotification returns current bucket notification configuration 81 func (c *Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) { 82 // Input validation. 83 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 84 return notification.Configuration{}, err 85 } 86 return c.getBucketNotification(ctx, bucketName) 87 } 88 89 // Request server for notification rules. 90 func (c *Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) { 91 urlValues := make(url.Values) 92 urlValues.Set("notification", "") 93 94 // Execute GET on bucket to list objects. 95 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 96 bucketName: bucketName, 97 queryValues: urlValues, 98 contentSHA256Hex: emptySHA256Hex, 99 }) 100 101 defer closeResponse(resp) 102 if err != nil { 103 return notification.Configuration{}, err 104 } 105 return processBucketNotificationResponse(bucketName, resp) 106 } 107 108 // processes the GetNotification http response from the server. 109 func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) { 110 if resp.StatusCode != http.StatusOK { 111 errResponse := httpRespToErrorResponse(resp, bucketName, "") 112 return notification.Configuration{}, errResponse 113 } 114 var bucketNotification notification.Configuration 115 err := xmlDecoder(resp.Body, &bucketNotification) 116 if err != nil { 117 return notification.Configuration{}, err 118 } 119 return bucketNotification, nil 120 } 121 122 // ListenNotification listen for all events, this is a MinIO specific API 123 func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info { 124 return c.ListenBucketNotification(ctx, "", prefix, suffix, events) 125 } 126 127 // ListenBucketNotification listen for bucket events, this is a MinIO specific API 128 func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info { 129 notificationInfoCh := make(chan notification.Info, 1) 130 const notificationCapacity = 4 * 1024 * 1024 131 notificationEventBuffer := make([]byte, notificationCapacity) 132 // Only success, start a routine to start reading line by line. 133 go func(notificationInfoCh chan<- notification.Info) { 134 defer close(notificationInfoCh) 135 136 // Validate the bucket name. 137 if bucketName != "" { 138 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 139 select { 140 case notificationInfoCh <- notification.Info{ 141 Err: err, 142 }: 143 case <-ctx.Done(): 144 } 145 return 146 } 147 } 148 149 // Check ARN partition to verify if listening bucket is supported 150 if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) { 151 select { 152 case notificationInfoCh <- notification.Info{ 153 Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"), 154 }: 155 case <-ctx.Done(): 156 } 157 return 158 } 159 160 // Continuously run and listen on bucket notification. 161 // Create a done channel to control 'ListObjects' go routine. 162 retryDoneCh := make(chan struct{}, 1) 163 164 // Indicate to our routine to exit cleanly upon return. 165 defer close(retryDoneCh) 166 167 // Prepare urlValues to pass into the request on every loop 168 urlValues := make(url.Values) 169 urlValues.Set("ping", "10") 170 urlValues.Set("prefix", prefix) 171 urlValues.Set("suffix", suffix) 172 urlValues["events"] = events 173 174 // Wait on the jitter retry loop. 175 for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) { 176 // Execute GET on bucket to list objects. 177 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 178 bucketName: bucketName, 179 queryValues: urlValues, 180 contentSHA256Hex: emptySHA256Hex, 181 }) 182 if err != nil { 183 select { 184 case notificationInfoCh <- notification.Info{ 185 Err: err, 186 }: 187 case <-ctx.Done(): 188 } 189 return 190 } 191 192 // Validate http response, upon error return quickly. 193 if resp.StatusCode != http.StatusOK { 194 errResponse := httpRespToErrorResponse(resp, bucketName, "") 195 select { 196 case notificationInfoCh <- notification.Info{ 197 Err: errResponse, 198 }: 199 case <-ctx.Done(): 200 } 201 return 202 } 203 204 // Initialize a new bufio scanner, to read line by line. 205 bio := bufio.NewScanner(resp.Body) 206 207 // Use a higher buffer to support unexpected 208 // caching done by proxies 209 bio.Buffer(notificationEventBuffer, notificationCapacity) 210 json := jsoniter.ConfigCompatibleWithStandardLibrary 211 212 // Unmarshal each line, returns marshaled values. 213 for bio.Scan() { 214 var notificationInfo notification.Info 215 if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil { 216 // Unexpected error during json unmarshal, send 217 // the error to caller for actionable as needed. 218 select { 219 case notificationInfoCh <- notification.Info{ 220 Err: err, 221 }: 222 case <-ctx.Done(): 223 return 224 } 225 closeResponse(resp) 226 continue 227 } 228 229 // Empty events pinged from the server 230 if len(notificationInfo.Records) == 0 && notificationInfo.Err == nil { 231 continue 232 } 233 234 // Send notificationInfo 235 select { 236 case notificationInfoCh <- notificationInfo: 237 case <-ctx.Done(): 238 closeResponse(resp) 239 return 240 } 241 } 242 243 if err = bio.Err(); err != nil { 244 select { 245 case notificationInfoCh <- notification.Info{ 246 Err: err, 247 }: 248 case <-ctx.Done(): 249 return 250 } 251 } 252 253 // Close current connection before looping further. 254 closeResponse(resp) 255 256 } 257 }(notificationInfoCh) 258 259 // Returns the notification info channel, for caller to start reading from. 260 return notificationInfoCh 261 }