api-get-object.go (20225B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2015-2020 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "context" 22 "errors" 23 "fmt" 24 "io" 25 "net/http" 26 "sync" 27 28 "github.com/minio/minio-go/v7/pkg/s3utils" 29 ) 30 31 // GetObject wrapper function that accepts a request context 32 func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (*Object, error) { 33 // Input validation. 34 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 35 return nil, err 36 } 37 if err := s3utils.CheckValidObjectName(objectName); err != nil { 38 return nil, err 39 } 40 41 gctx, cancel := context.WithCancel(ctx) 42 43 // Detect if snowball is server location we are talking to. 44 var snowball bool 45 if location, ok := c.bucketLocCache.Get(bucketName); ok { 46 snowball = location == "snowball" 47 } 48 49 var ( 50 err error 51 httpReader io.ReadCloser 52 objectInfo ObjectInfo 53 totalRead int 54 ) 55 56 // Create request channel. 57 reqCh := make(chan getRequest) 58 // Create response channel. 59 resCh := make(chan getResponse) 60 61 // This routine feeds partial object data as and when the caller reads. 62 go func() { 63 defer close(resCh) 64 defer func() { 65 // Close the http response body before returning. 66 // This ends the connection with the server. 67 if httpReader != nil { 68 httpReader.Close() 69 } 70 }() 71 defer cancel() 72 73 // Used to verify if etag of object has changed since last read. 74 var etag string 75 76 for req := range reqCh { 77 // If this is the first request we may not need to do a getObject request yet. 78 if req.isFirstReq { 79 // First request is a Read/ReadAt. 80 if req.isReadOp { 81 // Differentiate between wanting the whole object and just a range. 82 if req.isReadAt { 83 // If this is a ReadAt request only get the specified range. 84 // Range is set with respect to the offset and length of the buffer requested. 85 // Do not set objectInfo from the first readAt request because it will not get 86 // the whole object. 87 opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) 88 } else if req.Offset > 0 { 89 opts.SetRange(req.Offset, 0) 90 } 91 httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) 92 if err != nil { 93 resCh <- getResponse{Error: err} 94 return 95 } 96 etag = objectInfo.ETag 97 // Read at least firstReq.Buffer bytes, if not we have 98 // reached our EOF. 99 size, err := readFull(httpReader, req.Buffer) 100 totalRead += size 101 if size > 0 && err == io.ErrUnexpectedEOF { 102 if int64(size) < objectInfo.Size { 103 // In situations when returned size 104 // is less than the expected content 105 // length set by the server, make sure 106 // we return io.ErrUnexpectedEOF 107 err = io.ErrUnexpectedEOF 108 } else { 109 // If an EOF happens after reading some but not 110 // all the bytes ReadFull returns ErrUnexpectedEOF 111 err = io.EOF 112 } 113 } else if size == 0 && err == io.EOF && objectInfo.Size > 0 { 114 // Special cases when server writes more data 115 // than the content-length, net/http response 116 // body returns an error, instead of converting 117 // it to io.EOF - return unexpected EOF. 118 err = io.ErrUnexpectedEOF 119 } 120 // Send back the first response. 121 resCh <- getResponse{ 122 objectInfo: objectInfo, 123 Size: size, 124 Error: err, 125 didRead: true, 126 } 127 } else { 128 // First request is a Stat or Seek call. 129 // Only need to run a StatObject until an actual Read or ReadAt request comes through. 130 131 // Remove range header if already set, for stat Operations to get original file size. 132 delete(opts.headers, "Range") 133 objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) 134 if err != nil { 135 resCh <- getResponse{ 136 Error: err, 137 } 138 // Exit the go-routine. 139 return 140 } 141 etag = objectInfo.ETag 142 // Send back the first response. 143 resCh <- getResponse{ 144 objectInfo: objectInfo, 145 } 146 } 147 } else if req.settingObjectInfo { // Request is just to get objectInfo. 148 // Remove range header if already set, for stat Operations to get original file size. 149 delete(opts.headers, "Range") 150 // Check whether this is snowball 151 // if yes do not use If-Match feature 152 // it doesn't work. 153 if etag != "" && !snowball { 154 opts.SetMatchETag(etag) 155 } 156 objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) 157 if err != nil { 158 resCh <- getResponse{ 159 Error: err, 160 } 161 // Exit the goroutine. 162 return 163 } 164 // Send back the objectInfo. 165 resCh <- getResponse{ 166 objectInfo: objectInfo, 167 } 168 } else { 169 // Offset changes fetch the new object at an Offset. 170 // Because the httpReader may not be set by the first 171 // request if it was a stat or seek it must be checked 172 // if the object has been read or not to only initialize 173 // new ones when they haven't been already. 174 // All readAt requests are new requests. 175 if req.DidOffsetChange || !req.beenRead { 176 // Check whether this is snowball 177 // if yes do not use If-Match feature 178 // it doesn't work. 179 if etag != "" && !snowball { 180 opts.SetMatchETag(etag) 181 } 182 if httpReader != nil { 183 // Close previously opened http reader. 184 httpReader.Close() 185 } 186 // If this request is a readAt only get the specified range. 187 if req.isReadAt { 188 // Range is set with respect to the offset and length of the buffer requested. 189 opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) 190 } else if req.Offset > 0 { // Range is set with respect to the offset. 191 opts.SetRange(req.Offset, 0) 192 } else { 193 // Remove range header if already set 194 delete(opts.headers, "Range") 195 } 196 httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) 197 if err != nil { 198 resCh <- getResponse{ 199 Error: err, 200 } 201 return 202 } 203 totalRead = 0 204 } 205 206 // Read at least req.Buffer bytes, if not we have 207 // reached our EOF. 208 size, err := readFull(httpReader, req.Buffer) 209 totalRead += size 210 if size > 0 && err == io.ErrUnexpectedEOF { 211 if int64(totalRead) < objectInfo.Size { 212 // In situations when returned size 213 // is less than the expected content 214 // length set by the server, make sure 215 // we return io.ErrUnexpectedEOF 216 err = io.ErrUnexpectedEOF 217 } else { 218 // If an EOF happens after reading some but not 219 // all the bytes ReadFull returns ErrUnexpectedEOF 220 err = io.EOF 221 } 222 } else if size == 0 && err == io.EOF && objectInfo.Size > 0 { 223 // Special cases when server writes more data 224 // than the content-length, net/http response 225 // body returns an error, instead of converting 226 // it to io.EOF - return unexpected EOF. 227 err = io.ErrUnexpectedEOF 228 } 229 230 // Reply back how much was read. 231 resCh <- getResponse{ 232 Size: size, 233 Error: err, 234 didRead: true, 235 objectInfo: objectInfo, 236 } 237 } 238 } 239 }() 240 241 // Create a newObject through the information sent back by reqCh. 242 return newObject(gctx, cancel, reqCh, resCh), nil 243 } 244 245 // get request message container to communicate with internal 246 // go-routine. 247 type getRequest struct { 248 Buffer []byte 249 Offset int64 // readAt offset. 250 DidOffsetChange bool // Tracks the offset changes for Seek requests. 251 beenRead bool // Determines if this is the first time an object is being read. 252 isReadAt bool // Determines if this request is a request to a specific range 253 isReadOp bool // Determines if this request is a Read or Read/At request. 254 isFirstReq bool // Determines if this request is the first time an object is being accessed. 255 settingObjectInfo bool // Determines if this request is to set the objectInfo of an object. 256 } 257 258 // get response message container to reply back for the request. 259 type getResponse struct { 260 Size int 261 Error error 262 didRead bool // Lets subsequent calls know whether or not httpReader has been initiated. 263 objectInfo ObjectInfo // Used for the first request. 264 } 265 266 // Object represents an open object. It implements 267 // Reader, ReaderAt, Seeker, Closer for a HTTP stream. 268 type Object struct { 269 // Mutex. 270 mutex *sync.Mutex 271 272 // User allocated and defined. 273 reqCh chan<- getRequest 274 resCh <-chan getResponse 275 ctx context.Context 276 cancel context.CancelFunc 277 currOffset int64 278 objectInfo ObjectInfo 279 280 // Ask lower level to initiate data fetching based on currOffset 281 seekData bool 282 283 // Keeps track of closed call. 284 isClosed bool 285 286 // Keeps track of if this is the first call. 287 isStarted bool 288 289 // Previous error saved for future calls. 290 prevErr error 291 292 // Keeps track of if this object has been read yet. 293 beenRead bool 294 295 // Keeps track of if objectInfo has been set yet. 296 objectInfoSet bool 297 } 298 299 // doGetRequest - sends and blocks on the firstReqCh and reqCh of an object. 300 // Returns back the size of the buffer read, if anything was read, as well 301 // as any error encountered. For all first requests sent on the object 302 // it is also responsible for sending back the objectInfo. 303 func (o *Object) doGetRequest(request getRequest) (getResponse, error) { 304 select { 305 case <-o.ctx.Done(): 306 return getResponse{}, o.ctx.Err() 307 case o.reqCh <- request: 308 } 309 310 response := <-o.resCh 311 312 // Return any error to the top level. 313 if response.Error != nil { 314 return response, response.Error 315 } 316 317 // This was the first request. 318 if !o.isStarted { 319 // The object has been operated on. 320 o.isStarted = true 321 } 322 // Set the objectInfo if the request was not readAt 323 // and it hasn't been set before. 324 if !o.objectInfoSet && !request.isReadAt { 325 o.objectInfo = response.objectInfo 326 o.objectInfoSet = true 327 } 328 // Set beenRead only if it has not been set before. 329 if !o.beenRead { 330 o.beenRead = response.didRead 331 } 332 // Data are ready on the wire, no need to reinitiate connection in lower level 333 o.seekData = false 334 335 return response, nil 336 } 337 338 // setOffset - handles the setting of offsets for 339 // Read/ReadAt/Seek requests. 340 func (o *Object) setOffset(bytesRead int64) error { 341 // Update the currentOffset. 342 o.currOffset += bytesRead 343 344 if o.objectInfo.Size > -1 && o.currOffset >= o.objectInfo.Size { 345 return io.EOF 346 } 347 return nil 348 } 349 350 // Read reads up to len(b) bytes into b. It returns the number of 351 // bytes read (0 <= n <= len(b)) and any error encountered. Returns 352 // io.EOF upon end of file. 353 func (o *Object) Read(b []byte) (n int, err error) { 354 if o == nil { 355 return 0, errInvalidArgument("Object is nil") 356 } 357 358 // Locking. 359 o.mutex.Lock() 360 defer o.mutex.Unlock() 361 362 // prevErr is previous error saved from previous operation. 363 if o.prevErr != nil || o.isClosed { 364 return 0, o.prevErr 365 } 366 367 // Create a new request. 368 readReq := getRequest{ 369 isReadOp: true, 370 beenRead: o.beenRead, 371 Buffer: b, 372 } 373 374 // Alert that this is the first request. 375 if !o.isStarted { 376 readReq.isFirstReq = true 377 } 378 379 // Ask to establish a new data fetch routine based on seekData flag 380 readReq.DidOffsetChange = o.seekData 381 readReq.Offset = o.currOffset 382 383 // Send and receive from the first request. 384 response, err := o.doGetRequest(readReq) 385 if err != nil && err != io.EOF { 386 // Save the error for future calls. 387 o.prevErr = err 388 return response.Size, err 389 } 390 391 // Bytes read. 392 bytesRead := int64(response.Size) 393 394 // Set the new offset. 395 oerr := o.setOffset(bytesRead) 396 if oerr != nil { 397 // Save the error for future calls. 398 o.prevErr = oerr 399 return response.Size, oerr 400 } 401 402 // Return the response. 403 return response.Size, err 404 } 405 406 // Stat returns the ObjectInfo structure describing Object. 407 func (o *Object) Stat() (ObjectInfo, error) { 408 if o == nil { 409 return ObjectInfo{}, errInvalidArgument("Object is nil") 410 } 411 // Locking. 412 o.mutex.Lock() 413 defer o.mutex.Unlock() 414 415 if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed { 416 return ObjectInfo{}, o.prevErr 417 } 418 419 // This is the first request. 420 if !o.isStarted || !o.objectInfoSet { 421 // Send the request and get the response. 422 _, err := o.doGetRequest(getRequest{ 423 isFirstReq: !o.isStarted, 424 settingObjectInfo: !o.objectInfoSet, 425 }) 426 if err != nil { 427 o.prevErr = err 428 return ObjectInfo{}, err 429 } 430 } 431 432 return o.objectInfo, nil 433 } 434 435 // ReadAt reads len(b) bytes from the File starting at byte offset 436 // off. It returns the number of bytes read and the error, if any. 437 // ReadAt always returns a non-nil error when n < len(b). At end of 438 // file, that error is io.EOF. 439 func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) { 440 if o == nil { 441 return 0, errInvalidArgument("Object is nil") 442 } 443 444 // Locking. 445 o.mutex.Lock() 446 defer o.mutex.Unlock() 447 448 // prevErr is error which was saved in previous operation. 449 if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed { 450 return 0, o.prevErr 451 } 452 453 // Set the current offset to ReadAt offset, because the current offset will be shifted at the end of this method. 454 o.currOffset = offset 455 456 // Can only compare offsets to size when size has been set. 457 if o.objectInfoSet { 458 // If offset is negative than we return io.EOF. 459 // If offset is greater than or equal to object size we return io.EOF. 460 if (o.objectInfo.Size > -1 && offset >= o.objectInfo.Size) || offset < 0 { 461 return 0, io.EOF 462 } 463 } 464 465 // Create the new readAt request. 466 readAtReq := getRequest{ 467 isReadOp: true, 468 isReadAt: true, 469 DidOffsetChange: true, // Offset always changes. 470 beenRead: o.beenRead, // Set if this is the first request to try and read. 471 Offset: offset, // Set the offset. 472 Buffer: b, 473 } 474 475 // Alert that this is the first request. 476 if !o.isStarted { 477 readAtReq.isFirstReq = true 478 } 479 480 // Send and receive from the first request. 481 response, err := o.doGetRequest(readAtReq) 482 if err != nil && err != io.EOF { 483 // Save the error. 484 o.prevErr = err 485 return response.Size, err 486 } 487 // Bytes read. 488 bytesRead := int64(response.Size) 489 // There is no valid objectInfo yet 490 // to compare against for EOF. 491 if !o.objectInfoSet { 492 // Update the currentOffset. 493 o.currOffset += bytesRead 494 } else { 495 // If this was not the first request update 496 // the offsets and compare against objectInfo 497 // for EOF. 498 oerr := o.setOffset(bytesRead) 499 if oerr != nil { 500 o.prevErr = oerr 501 return response.Size, oerr 502 } 503 } 504 return response.Size, err 505 } 506 507 // Seek sets the offset for the next Read or Write to offset, 508 // interpreted according to whence: 0 means relative to the 509 // origin of the file, 1 means relative to the current offset, 510 // and 2 means relative to the end. 511 // Seek returns the new offset and an error, if any. 512 // 513 // Seeking to a negative offset is an error. Seeking to any positive 514 // offset is legal, subsequent io operations succeed until the 515 // underlying object is not closed. 516 func (o *Object) Seek(offset int64, whence int) (n int64, err error) { 517 if o == nil { 518 return 0, errInvalidArgument("Object is nil") 519 } 520 521 // Locking. 522 o.mutex.Lock() 523 defer o.mutex.Unlock() 524 525 // At EOF seeking is legal allow only io.EOF, for any other errors we return. 526 if o.prevErr != nil && o.prevErr != io.EOF { 527 return 0, o.prevErr 528 } 529 530 // Negative offset is valid for whence of '2'. 531 if offset < 0 && whence != 2 { 532 return 0, errInvalidArgument(fmt.Sprintf("Negative position not allowed for %d", whence)) 533 } 534 535 // This is the first request. So before anything else 536 // get the ObjectInfo. 537 if !o.isStarted || !o.objectInfoSet { 538 // Create the new Seek request. 539 seekReq := getRequest{ 540 isReadOp: false, 541 Offset: offset, 542 isFirstReq: true, 543 } 544 // Send and receive from the seek request. 545 _, err := o.doGetRequest(seekReq) 546 if err != nil { 547 // Save the error. 548 o.prevErr = err 549 return 0, err 550 } 551 } 552 553 // Switch through whence. 554 switch whence { 555 default: 556 return 0, errInvalidArgument(fmt.Sprintf("Invalid whence %d", whence)) 557 case 0: 558 if o.objectInfo.Size > -1 && offset > o.objectInfo.Size { 559 return 0, io.EOF 560 } 561 o.currOffset = offset 562 case 1: 563 if o.objectInfo.Size > -1 && o.currOffset+offset > o.objectInfo.Size { 564 return 0, io.EOF 565 } 566 o.currOffset += offset 567 case 2: 568 // If we don't know the object size return an error for io.SeekEnd 569 if o.objectInfo.Size < 0 { 570 return 0, errInvalidArgument("Whence END is not supported when the object size is unknown") 571 } 572 // Seeking to positive offset is valid for whence '2', but 573 // since we are backing a Reader we have reached 'EOF' if 574 // offset is positive. 575 if offset > 0 { 576 return 0, io.EOF 577 } 578 // Seeking to negative position not allowed for whence. 579 if o.objectInfo.Size+offset < 0 { 580 return 0, errInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence)) 581 } 582 o.currOffset = o.objectInfo.Size + offset 583 } 584 // Reset the saved error since we successfully seeked, let the Read 585 // and ReadAt decide. 586 if o.prevErr == io.EOF { 587 o.prevErr = nil 588 } 589 590 // Ask lower level to fetch again from source 591 o.seekData = true 592 593 // Return the effective offset. 594 return o.currOffset, nil 595 } 596 597 // Close - The behavior of Close after the first call returns error 598 // for subsequent Close() calls. 599 func (o *Object) Close() (err error) { 600 if o == nil { 601 return errInvalidArgument("Object is nil") 602 } 603 604 // Locking. 605 o.mutex.Lock() 606 defer o.mutex.Unlock() 607 608 // if already closed return an error. 609 if o.isClosed { 610 return o.prevErr 611 } 612 613 // Close successfully. 614 o.cancel() 615 616 // Close the request channel to indicate the internal go-routine to exit. 617 close(o.reqCh) 618 619 // Save for future operations. 620 errMsg := "Object is already closed. Bad file descriptor." 621 o.prevErr = errors.New(errMsg) 622 // Save here that we closed done channel successfully. 623 o.isClosed = true 624 return nil 625 } 626 627 // newObject instantiates a new *minio.Object* 628 // ObjectInfo will be set by setObjectInfo 629 func newObject(ctx context.Context, cancel context.CancelFunc, reqCh chan<- getRequest, resCh <-chan getResponse) *Object { 630 return &Object{ 631 ctx: ctx, 632 cancel: cancel, 633 mutex: &sync.Mutex{}, 634 reqCh: reqCh, 635 resCh: resCh, 636 } 637 } 638 639 // getObject - retrieve object from Object Storage. 640 // 641 // Additionally this function also takes range arguments to download the specified 642 // range bytes of an object. Setting offset and length = 0 will download the full object. 643 // 644 // For more information about the HTTP Range header. 645 // go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35. 646 func (c *Client) getObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (io.ReadCloser, ObjectInfo, http.Header, error) { 647 // Validate input arguments. 648 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 649 return nil, ObjectInfo{}, nil, err 650 } 651 if err := s3utils.CheckValidObjectName(objectName); err != nil { 652 return nil, ObjectInfo{}, nil, err 653 } 654 655 // Execute GET on objectName. 656 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ 657 bucketName: bucketName, 658 objectName: objectName, 659 queryValues: opts.toQueryValues(), 660 customHeader: opts.Header(), 661 contentSHA256Hex: emptySHA256Hex, 662 }) 663 if err != nil { 664 return nil, ObjectInfo{}, nil, err 665 } 666 if resp != nil { 667 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { 668 return nil, ObjectInfo{}, nil, httpRespToErrorResponse(resp, bucketName, objectName) 669 } 670 } 671 672 objectStat, err := ToObjectInfo(bucketName, objectName, resp.Header) 673 if err != nil { 674 closeResponse(resp) 675 return nil, ObjectInfo{}, nil, err 676 } 677 678 // do not close body here, caller will close 679 return resp.Body, objectStat, resp.Header, nil 680 }