api-select.go (21611B)
1 /* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * (C) 2018-2020 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package minio 19 20 import ( 21 "bytes" 22 "context" 23 "encoding/binary" 24 "encoding/xml" 25 "errors" 26 "fmt" 27 "hash" 28 "hash/crc32" 29 "io" 30 "net/http" 31 "net/url" 32 "strings" 33 34 "github.com/minio/minio-go/v7/pkg/encrypt" 35 "github.com/minio/minio-go/v7/pkg/s3utils" 36 ) 37 38 // CSVFileHeaderInfo - is the parameter for whether to utilize headers. 39 type CSVFileHeaderInfo string 40 41 // Constants for file header info. 42 const ( 43 CSVFileHeaderInfoNone CSVFileHeaderInfo = "NONE" 44 CSVFileHeaderInfoIgnore CSVFileHeaderInfo = "IGNORE" 45 CSVFileHeaderInfoUse CSVFileHeaderInfo = "USE" 46 ) 47 48 // SelectCompressionType - is the parameter for what type of compression is 49 // present 50 type SelectCompressionType string 51 52 // Constants for compression types under select API. 53 const ( 54 SelectCompressionNONE SelectCompressionType = "NONE" 55 SelectCompressionGZIP SelectCompressionType = "GZIP" 56 SelectCompressionBZIP SelectCompressionType = "BZIP2" 57 58 // Non-standard compression schemes, supported by MinIO hosts: 59 60 SelectCompressionZSTD SelectCompressionType = "ZSTD" // Zstandard compression. 61 SelectCompressionLZ4 SelectCompressionType = "LZ4" // LZ4 Stream 62 SelectCompressionS2 SelectCompressionType = "S2" // S2 Stream 63 SelectCompressionSNAPPY SelectCompressionType = "SNAPPY" // Snappy stream 64 ) 65 66 // CSVQuoteFields - is the parameter for how CSV fields are quoted. 67 type CSVQuoteFields string 68 69 // Constants for csv quote styles. 70 const ( 71 CSVQuoteFieldsAlways CSVQuoteFields = "Always" 72 CSVQuoteFieldsAsNeeded CSVQuoteFields = "AsNeeded" 73 ) 74 75 // QueryExpressionType - is of what syntax the expression is, this should only 76 // be SQL 77 type QueryExpressionType string 78 79 // Constants for expression type. 80 const ( 81 QueryExpressionTypeSQL QueryExpressionType = "SQL" 82 ) 83 84 // JSONType determines json input serialization type. 85 type JSONType string 86 87 // Constants for JSONTypes. 88 const ( 89 JSONDocumentType JSONType = "DOCUMENT" 90 JSONLinesType JSONType = "LINES" 91 ) 92 93 // ParquetInputOptions parquet input specific options 94 type ParquetInputOptions struct{} 95 96 // CSVInputOptions csv input specific options 97 type CSVInputOptions struct { 98 FileHeaderInfo CSVFileHeaderInfo 99 fileHeaderInfoSet bool 100 101 RecordDelimiter string 102 recordDelimiterSet bool 103 104 FieldDelimiter string 105 fieldDelimiterSet bool 106 107 QuoteCharacter string 108 quoteCharacterSet bool 109 110 QuoteEscapeCharacter string 111 quoteEscapeCharacterSet bool 112 113 Comments string 114 commentsSet bool 115 } 116 117 // SetFileHeaderInfo sets the file header info in the CSV input options 118 func (c *CSVInputOptions) SetFileHeaderInfo(val CSVFileHeaderInfo) { 119 c.FileHeaderInfo = val 120 c.fileHeaderInfoSet = true 121 } 122 123 // SetRecordDelimiter sets the record delimiter in the CSV input options 124 func (c *CSVInputOptions) SetRecordDelimiter(val string) { 125 c.RecordDelimiter = val 126 c.recordDelimiterSet = true 127 } 128 129 // SetFieldDelimiter sets the field delimiter in the CSV input options 130 func (c *CSVInputOptions) SetFieldDelimiter(val string) { 131 c.FieldDelimiter = val 132 c.fieldDelimiterSet = true 133 } 134 135 // SetQuoteCharacter sets the quote character in the CSV input options 136 func (c *CSVInputOptions) SetQuoteCharacter(val string) { 137 c.QuoteCharacter = val 138 c.quoteCharacterSet = true 139 } 140 141 // SetQuoteEscapeCharacter sets the quote escape character in the CSV input options 142 func (c *CSVInputOptions) SetQuoteEscapeCharacter(val string) { 143 c.QuoteEscapeCharacter = val 144 c.quoteEscapeCharacterSet = true 145 } 146 147 // SetComments sets the comments character in the CSV input options 148 func (c *CSVInputOptions) SetComments(val string) { 149 c.Comments = val 150 c.commentsSet = true 151 } 152 153 // MarshalXML - produces the xml representation of the CSV input options struct 154 func (c CSVInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { 155 if err := e.EncodeToken(start); err != nil { 156 return err 157 } 158 if c.FileHeaderInfo != "" || c.fileHeaderInfoSet { 159 if err := e.EncodeElement(c.FileHeaderInfo, xml.StartElement{Name: xml.Name{Local: "FileHeaderInfo"}}); err != nil { 160 return err 161 } 162 } 163 164 if c.RecordDelimiter != "" || c.recordDelimiterSet { 165 if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { 166 return err 167 } 168 } 169 170 if c.FieldDelimiter != "" || c.fieldDelimiterSet { 171 if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil { 172 return err 173 } 174 } 175 176 if c.QuoteCharacter != "" || c.quoteCharacterSet { 177 if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil { 178 return err 179 } 180 } 181 182 if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet { 183 if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil { 184 return err 185 } 186 } 187 188 if c.Comments != "" || c.commentsSet { 189 if err := e.EncodeElement(c.Comments, xml.StartElement{Name: xml.Name{Local: "Comments"}}); err != nil { 190 return err 191 } 192 } 193 194 return e.EncodeToken(xml.EndElement{Name: start.Name}) 195 } 196 197 // CSVOutputOptions csv output specific options 198 type CSVOutputOptions struct { 199 QuoteFields CSVQuoteFields 200 quoteFieldsSet bool 201 202 RecordDelimiter string 203 recordDelimiterSet bool 204 205 FieldDelimiter string 206 fieldDelimiterSet bool 207 208 QuoteCharacter string 209 quoteCharacterSet bool 210 211 QuoteEscapeCharacter string 212 quoteEscapeCharacterSet bool 213 } 214 215 // SetQuoteFields sets the quote field parameter in the CSV output options 216 func (c *CSVOutputOptions) SetQuoteFields(val CSVQuoteFields) { 217 c.QuoteFields = val 218 c.quoteFieldsSet = true 219 } 220 221 // SetRecordDelimiter sets the record delimiter character in the CSV output options 222 func (c *CSVOutputOptions) SetRecordDelimiter(val string) { 223 c.RecordDelimiter = val 224 c.recordDelimiterSet = true 225 } 226 227 // SetFieldDelimiter sets the field delimiter character in the CSV output options 228 func (c *CSVOutputOptions) SetFieldDelimiter(val string) { 229 c.FieldDelimiter = val 230 c.fieldDelimiterSet = true 231 } 232 233 // SetQuoteCharacter sets the quote character in the CSV output options 234 func (c *CSVOutputOptions) SetQuoteCharacter(val string) { 235 c.QuoteCharacter = val 236 c.quoteCharacterSet = true 237 } 238 239 // SetQuoteEscapeCharacter sets the quote escape character in the CSV output options 240 func (c *CSVOutputOptions) SetQuoteEscapeCharacter(val string) { 241 c.QuoteEscapeCharacter = val 242 c.quoteEscapeCharacterSet = true 243 } 244 245 // MarshalXML - produces the xml representation of the CSVOutputOptions struct 246 func (c CSVOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { 247 if err := e.EncodeToken(start); err != nil { 248 return err 249 } 250 251 if c.QuoteFields != "" || c.quoteFieldsSet { 252 if err := e.EncodeElement(c.QuoteFields, xml.StartElement{Name: xml.Name{Local: "QuoteFields"}}); err != nil { 253 return err 254 } 255 } 256 257 if c.RecordDelimiter != "" || c.recordDelimiterSet { 258 if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { 259 return err 260 } 261 } 262 263 if c.FieldDelimiter != "" || c.fieldDelimiterSet { 264 if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil { 265 return err 266 } 267 } 268 269 if c.QuoteCharacter != "" || c.quoteCharacterSet { 270 if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil { 271 return err 272 } 273 } 274 275 if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet { 276 if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil { 277 return err 278 } 279 } 280 281 return e.EncodeToken(xml.EndElement{Name: start.Name}) 282 } 283 284 // JSONInputOptions json input specific options 285 type JSONInputOptions struct { 286 Type JSONType 287 typeSet bool 288 } 289 290 // SetType sets the JSON type in the JSON input options 291 func (j *JSONInputOptions) SetType(typ JSONType) { 292 j.Type = typ 293 j.typeSet = true 294 } 295 296 // MarshalXML - produces the xml representation of the JSONInputOptions struct 297 func (j JSONInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { 298 if err := e.EncodeToken(start); err != nil { 299 return err 300 } 301 302 if j.Type != "" || j.typeSet { 303 if err := e.EncodeElement(j.Type, xml.StartElement{Name: xml.Name{Local: "Type"}}); err != nil { 304 return err 305 } 306 } 307 308 return e.EncodeToken(xml.EndElement{Name: start.Name}) 309 } 310 311 // JSONOutputOptions - json output specific options 312 type JSONOutputOptions struct { 313 RecordDelimiter string 314 recordDelimiterSet bool 315 } 316 317 // SetRecordDelimiter sets the record delimiter in the JSON output options 318 func (j *JSONOutputOptions) SetRecordDelimiter(val string) { 319 j.RecordDelimiter = val 320 j.recordDelimiterSet = true 321 } 322 323 // MarshalXML - produces the xml representation of the JSONOutputOptions struct 324 func (j JSONOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { 325 if err := e.EncodeToken(start); err != nil { 326 return err 327 } 328 329 if j.RecordDelimiter != "" || j.recordDelimiterSet { 330 if err := e.EncodeElement(j.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { 331 return err 332 } 333 } 334 335 return e.EncodeToken(xml.EndElement{Name: start.Name}) 336 } 337 338 // SelectObjectInputSerialization - input serialization parameters 339 type SelectObjectInputSerialization struct { 340 CompressionType SelectCompressionType `xml:"CompressionType,omitempty"` 341 Parquet *ParquetInputOptions `xml:"Parquet,omitempty"` 342 CSV *CSVInputOptions `xml:"CSV,omitempty"` 343 JSON *JSONInputOptions `xml:"JSON,omitempty"` 344 } 345 346 // SelectObjectOutputSerialization - output serialization parameters. 347 type SelectObjectOutputSerialization struct { 348 CSV *CSVOutputOptions `xml:"CSV,omitempty"` 349 JSON *JSONOutputOptions `xml:"JSON,omitempty"` 350 } 351 352 // SelectObjectOptions - represents the input select body 353 type SelectObjectOptions struct { 354 XMLName xml.Name `xml:"SelectObjectContentRequest" json:"-"` 355 ServerSideEncryption encrypt.ServerSide `xml:"-"` 356 Expression string 357 ExpressionType QueryExpressionType 358 InputSerialization SelectObjectInputSerialization 359 OutputSerialization SelectObjectOutputSerialization 360 RequestProgress struct { 361 Enabled bool 362 } 363 } 364 365 // Header returns the http.Header representation of the SelectObject options. 366 func (o SelectObjectOptions) Header() http.Header { 367 headers := make(http.Header) 368 if o.ServerSideEncryption != nil && o.ServerSideEncryption.Type() == encrypt.SSEC { 369 o.ServerSideEncryption.Marshal(headers) 370 } 371 return headers 372 } 373 374 // SelectObjectType - is the parameter which defines what type of object the 375 // operation is being performed on. 376 type SelectObjectType string 377 378 // Constants for input data types. 379 const ( 380 SelectObjectTypeCSV SelectObjectType = "CSV" 381 SelectObjectTypeJSON SelectObjectType = "JSON" 382 SelectObjectTypeParquet SelectObjectType = "Parquet" 383 ) 384 385 // preludeInfo is used for keeping track of necessary information from the 386 // prelude. 387 type preludeInfo struct { 388 totalLen uint32 389 headerLen uint32 390 } 391 392 // SelectResults is used for the streaming responses from the server. 393 type SelectResults struct { 394 pipeReader *io.PipeReader 395 resp *http.Response 396 stats *StatsMessage 397 progress *ProgressMessage 398 } 399 400 // ProgressMessage is a struct for progress xml message. 401 type ProgressMessage struct { 402 XMLName xml.Name `xml:"Progress" json:"-"` 403 StatsMessage 404 } 405 406 // StatsMessage is a struct for stat xml message. 407 type StatsMessage struct { 408 XMLName xml.Name `xml:"Stats" json:"-"` 409 BytesScanned int64 410 BytesProcessed int64 411 BytesReturned int64 412 } 413 414 // messageType represents the type of message. 415 type messageType string 416 417 const ( 418 errorMsg messageType = "error" 419 commonMsg messageType = "event" 420 ) 421 422 // eventType represents the type of event. 423 type eventType string 424 425 // list of event-types returned by Select API. 426 const ( 427 endEvent eventType = "End" 428 recordsEvent eventType = "Records" 429 progressEvent eventType = "Progress" 430 statsEvent eventType = "Stats" 431 ) 432 433 // contentType represents content type of event. 434 type contentType string 435 436 const ( 437 xmlContent contentType = "text/xml" 438 ) 439 440 // SelectObjectContent is a implementation of http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html AWS S3 API. 441 func (c *Client) SelectObjectContent(ctx context.Context, bucketName, objectName string, opts SelectObjectOptions) (*SelectResults, error) { 442 // Input validation. 443 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 444 return nil, err 445 } 446 if err := s3utils.CheckValidObjectName(objectName); err != nil { 447 return nil, err 448 } 449 450 selectReqBytes, err := xml.Marshal(opts) 451 if err != nil { 452 return nil, err 453 } 454 455 urlValues := make(url.Values) 456 urlValues.Set("select", "") 457 urlValues.Set("select-type", "2") 458 459 // Execute POST on bucket/object. 460 resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{ 461 bucketName: bucketName, 462 objectName: objectName, 463 queryValues: urlValues, 464 customHeader: opts.Header(), 465 contentMD5Base64: sumMD5Base64(selectReqBytes), 466 contentSHA256Hex: sum256Hex(selectReqBytes), 467 contentBody: bytes.NewReader(selectReqBytes), 468 contentLength: int64(len(selectReqBytes)), 469 }) 470 if err != nil { 471 return nil, err 472 } 473 474 return NewSelectResults(resp, bucketName) 475 } 476 477 // NewSelectResults creates a Select Result parser that parses the response 478 // and returns a Reader that will return parsed and assembled select output. 479 func NewSelectResults(resp *http.Response, bucketName string) (*SelectResults, error) { 480 if resp.StatusCode != http.StatusOK { 481 return nil, httpRespToErrorResponse(resp, bucketName, "") 482 } 483 484 pipeReader, pipeWriter := io.Pipe() 485 streamer := &SelectResults{ 486 resp: resp, 487 stats: &StatsMessage{}, 488 progress: &ProgressMessage{}, 489 pipeReader: pipeReader, 490 } 491 streamer.start(pipeWriter) 492 return streamer, nil 493 } 494 495 // Close - closes the underlying response body and the stream reader. 496 func (s *SelectResults) Close() error { 497 defer closeResponse(s.resp) 498 return s.pipeReader.Close() 499 } 500 501 // Read - is a reader compatible implementation for SelectObjectContent records. 502 func (s *SelectResults) Read(b []byte) (n int, err error) { 503 return s.pipeReader.Read(b) 504 } 505 506 // Stats - information about a request's stats when processing is complete. 507 func (s *SelectResults) Stats() *StatsMessage { 508 return s.stats 509 } 510 511 // Progress - information about the progress of a request. 512 func (s *SelectResults) Progress() *ProgressMessage { 513 return s.progress 514 } 515 516 // start is the main function that decodes the large byte array into 517 // several events that are sent through the eventstream. 518 func (s *SelectResults) start(pipeWriter *io.PipeWriter) { 519 go func() { 520 for { 521 var prelude preludeInfo 522 headers := make(http.Header) 523 var err error 524 525 // Create CRC code 526 crc := crc32.New(crc32.IEEETable) 527 crcReader := io.TeeReader(s.resp.Body, crc) 528 529 // Extract the prelude(12 bytes) into a struct to extract relevant information. 530 prelude, err = processPrelude(crcReader, crc) 531 if err != nil { 532 pipeWriter.CloseWithError(err) 533 closeResponse(s.resp) 534 return 535 } 536 537 // Extract the headers(variable bytes) into a struct to extract relevant information 538 if prelude.headerLen > 0 { 539 if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil { 540 pipeWriter.CloseWithError(err) 541 closeResponse(s.resp) 542 return 543 } 544 } 545 546 // Get the actual payload length so that the appropriate amount of 547 // bytes can be read or parsed. 548 payloadLen := prelude.PayloadLen() 549 550 m := messageType(headers.Get("message-type")) 551 552 switch m { 553 case errorMsg: 554 pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\"")) 555 closeResponse(s.resp) 556 return 557 case commonMsg: 558 // Get content-type of the payload. 559 c := contentType(headers.Get("content-type")) 560 561 // Get event type of the payload. 562 e := eventType(headers.Get("event-type")) 563 564 // Handle all supported events. 565 switch e { 566 case endEvent: 567 pipeWriter.Close() 568 closeResponse(s.resp) 569 return 570 case recordsEvent: 571 if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { 572 pipeWriter.CloseWithError(err) 573 closeResponse(s.resp) 574 return 575 } 576 case progressEvent: 577 switch c { 578 case xmlContent: 579 if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { 580 pipeWriter.CloseWithError(err) 581 closeResponse(s.resp) 582 return 583 } 584 default: 585 pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) 586 closeResponse(s.resp) 587 return 588 } 589 case statsEvent: 590 switch c { 591 case xmlContent: 592 if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { 593 pipeWriter.CloseWithError(err) 594 closeResponse(s.resp) 595 return 596 } 597 default: 598 pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) 599 closeResponse(s.resp) 600 return 601 } 602 } 603 } 604 605 // Ensures that the full message's CRC is correct and 606 // that the message is not corrupted 607 if err := checkCRC(s.resp.Body, crc.Sum32()); err != nil { 608 pipeWriter.CloseWithError(err) 609 closeResponse(s.resp) 610 return 611 } 612 613 } 614 }() 615 } 616 617 // PayloadLen is a function that calculates the length of the payload. 618 func (p preludeInfo) PayloadLen() int64 { 619 return int64(p.totalLen - p.headerLen - 16) 620 } 621 622 // processPrelude is the function that reads the 12 bytes of the prelude and 623 // ensures the CRC is correct while also extracting relevant information into 624 // the struct, 625 func processPrelude(prelude io.Reader, crc hash.Hash32) (preludeInfo, error) { 626 var err error 627 pInfo := preludeInfo{} 628 629 // reads total length of the message (first 4 bytes) 630 pInfo.totalLen, err = extractUint32(prelude) 631 if err != nil { 632 return pInfo, err 633 } 634 635 // reads total header length of the message (2nd 4 bytes) 636 pInfo.headerLen, err = extractUint32(prelude) 637 if err != nil { 638 return pInfo, err 639 } 640 641 // checks that the CRC is correct (3rd 4 bytes) 642 preCRC := crc.Sum32() 643 if err := checkCRC(prelude, preCRC); err != nil { 644 return pInfo, err 645 } 646 647 return pInfo, nil 648 } 649 650 // extracts the relevant information from the Headers. 651 func extractHeader(body io.Reader, myHeaders http.Header) error { 652 for { 653 // extracts the first part of the header, 654 headerTypeName, err := extractHeaderType(body) 655 if err != nil { 656 // Since end of file, we have read all of our headers 657 if err == io.EOF { 658 break 659 } 660 return err 661 } 662 663 // reads the 7 present in the header and ignores it. 664 extractUint8(body) 665 666 headerValueName, err := extractHeaderValue(body) 667 if err != nil { 668 return err 669 } 670 671 myHeaders.Set(headerTypeName, headerValueName) 672 673 } 674 return nil 675 } 676 677 // extractHeaderType extracts the first half of the header message, the header type. 678 func extractHeaderType(body io.Reader) (string, error) { 679 // extracts 2 bit integer 680 headerNameLen, err := extractUint8(body) 681 if err != nil { 682 return "", err 683 } 684 // extracts the string with the appropriate number of bytes 685 headerName, err := extractString(body, int(headerNameLen)) 686 if err != nil { 687 return "", err 688 } 689 return strings.TrimPrefix(headerName, ":"), nil 690 } 691 692 // extractsHeaderValue extracts the second half of the header message, the 693 // header value 694 func extractHeaderValue(body io.Reader) (string, error) { 695 bodyLen, err := extractUint16(body) 696 if err != nil { 697 return "", err 698 } 699 bodyName, err := extractString(body, int(bodyLen)) 700 if err != nil { 701 return "", err 702 } 703 return bodyName, nil 704 } 705 706 // extracts a string from byte array of a particular number of bytes. 707 func extractString(source io.Reader, lenBytes int) (string, error) { 708 myVal := make([]byte, lenBytes) 709 _, err := source.Read(myVal) 710 if err != nil { 711 return "", err 712 } 713 return string(myVal), nil 714 } 715 716 // extractUint32 extracts a 4 byte integer from the byte array. 717 func extractUint32(r io.Reader) (uint32, error) { 718 buf := make([]byte, 4) 719 _, err := readFull(r, buf) 720 if err != nil { 721 return 0, err 722 } 723 return binary.BigEndian.Uint32(buf), nil 724 } 725 726 // extractUint16 extracts a 2 byte integer from the byte array. 727 func extractUint16(r io.Reader) (uint16, error) { 728 buf := make([]byte, 2) 729 _, err := readFull(r, buf) 730 if err != nil { 731 return 0, err 732 } 733 return binary.BigEndian.Uint16(buf), nil 734 } 735 736 // extractUint8 extracts a 1 byte integer from the byte array. 737 func extractUint8(r io.Reader) (uint8, error) { 738 buf := make([]byte, 1) 739 _, err := readFull(r, buf) 740 if err != nil { 741 return 0, err 742 } 743 return buf[0], nil 744 } 745 746 // checkCRC ensures that the CRC matches with the one from the reader. 747 func checkCRC(r io.Reader, expect uint32) error { 748 msgCRC, err := extractUint32(r) 749 if err != nil { 750 return err 751 } 752 753 if msgCRC != expect { 754 return fmt.Errorf("Checksum Mismatch, MessageCRC of 0x%X does not equal expected CRC of 0x%X", msgCRC, expect) 755 } 756 return nil 757 }