gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

api-select.go (21611B)


      1 /*
      2  * MinIO Go Library for Amazon S3 Compatible Cloud Storage
      3  * (C) 2018-2020 MinIO, Inc.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package minio
     19 
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"encoding/binary"
     24 	"encoding/xml"
     25 	"errors"
     26 	"fmt"
     27 	"hash"
     28 	"hash/crc32"
     29 	"io"
     30 	"net/http"
     31 	"net/url"
     32 	"strings"
     33 
     34 	"github.com/minio/minio-go/v7/pkg/encrypt"
     35 	"github.com/minio/minio-go/v7/pkg/s3utils"
     36 )
     37 
     38 // CSVFileHeaderInfo - is the parameter for whether to utilize headers.
     39 type CSVFileHeaderInfo string
     40 
     41 // Constants for file header info.
     42 const (
     43 	CSVFileHeaderInfoNone   CSVFileHeaderInfo = "NONE"
     44 	CSVFileHeaderInfoIgnore CSVFileHeaderInfo = "IGNORE"
     45 	CSVFileHeaderInfoUse    CSVFileHeaderInfo = "USE"
     46 )
     47 
     48 // SelectCompressionType - is the parameter for what type of compression is
     49 // present
     50 type SelectCompressionType string
     51 
     52 // Constants for compression types under select API.
     53 const (
     54 	SelectCompressionNONE SelectCompressionType = "NONE"
     55 	SelectCompressionGZIP SelectCompressionType = "GZIP"
     56 	SelectCompressionBZIP SelectCompressionType = "BZIP2"
     57 
     58 	// Non-standard compression schemes, supported by MinIO hosts:
     59 
     60 	SelectCompressionZSTD   SelectCompressionType = "ZSTD"   // Zstandard compression.
     61 	SelectCompressionLZ4    SelectCompressionType = "LZ4"    // LZ4 Stream
     62 	SelectCompressionS2     SelectCompressionType = "S2"     // S2 Stream
     63 	SelectCompressionSNAPPY SelectCompressionType = "SNAPPY" // Snappy stream
     64 )
     65 
     66 // CSVQuoteFields - is the parameter for how CSV fields are quoted.
     67 type CSVQuoteFields string
     68 
     69 // Constants for csv quote styles.
     70 const (
     71 	CSVQuoteFieldsAlways   CSVQuoteFields = "Always"
     72 	CSVQuoteFieldsAsNeeded CSVQuoteFields = "AsNeeded"
     73 )
     74 
     75 // QueryExpressionType - is of what syntax the expression is, this should only
     76 // be SQL
     77 type QueryExpressionType string
     78 
     79 // Constants for expression type.
     80 const (
     81 	QueryExpressionTypeSQL QueryExpressionType = "SQL"
     82 )
     83 
     84 // JSONType determines json input serialization type.
     85 type JSONType string
     86 
     87 // Constants for JSONTypes.
     88 const (
     89 	JSONDocumentType JSONType = "DOCUMENT"
     90 	JSONLinesType    JSONType = "LINES"
     91 )
     92 
     93 // ParquetInputOptions parquet input specific options
     94 type ParquetInputOptions struct{}
     95 
     96 // CSVInputOptions csv input specific options
     97 type CSVInputOptions struct {
     98 	FileHeaderInfo    CSVFileHeaderInfo
     99 	fileHeaderInfoSet bool
    100 
    101 	RecordDelimiter    string
    102 	recordDelimiterSet bool
    103 
    104 	FieldDelimiter    string
    105 	fieldDelimiterSet bool
    106 
    107 	QuoteCharacter    string
    108 	quoteCharacterSet bool
    109 
    110 	QuoteEscapeCharacter    string
    111 	quoteEscapeCharacterSet bool
    112 
    113 	Comments    string
    114 	commentsSet bool
    115 }
    116 
    117 // SetFileHeaderInfo sets the file header info in the CSV input options
    118 func (c *CSVInputOptions) SetFileHeaderInfo(val CSVFileHeaderInfo) {
    119 	c.FileHeaderInfo = val
    120 	c.fileHeaderInfoSet = true
    121 }
    122 
    123 // SetRecordDelimiter sets the record delimiter in the CSV input options
    124 func (c *CSVInputOptions) SetRecordDelimiter(val string) {
    125 	c.RecordDelimiter = val
    126 	c.recordDelimiterSet = true
    127 }
    128 
    129 // SetFieldDelimiter sets the field delimiter in the CSV input options
    130 func (c *CSVInputOptions) SetFieldDelimiter(val string) {
    131 	c.FieldDelimiter = val
    132 	c.fieldDelimiterSet = true
    133 }
    134 
    135 // SetQuoteCharacter sets the quote character in the CSV input options
    136 func (c *CSVInputOptions) SetQuoteCharacter(val string) {
    137 	c.QuoteCharacter = val
    138 	c.quoteCharacterSet = true
    139 }
    140 
    141 // SetQuoteEscapeCharacter sets the quote escape character in the CSV input options
    142 func (c *CSVInputOptions) SetQuoteEscapeCharacter(val string) {
    143 	c.QuoteEscapeCharacter = val
    144 	c.quoteEscapeCharacterSet = true
    145 }
    146 
    147 // SetComments sets the comments character in the CSV input options
    148 func (c *CSVInputOptions) SetComments(val string) {
    149 	c.Comments = val
    150 	c.commentsSet = true
    151 }
    152 
    153 // MarshalXML - produces the xml representation of the CSV input options struct
    154 func (c CSVInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
    155 	if err := e.EncodeToken(start); err != nil {
    156 		return err
    157 	}
    158 	if c.FileHeaderInfo != "" || c.fileHeaderInfoSet {
    159 		if err := e.EncodeElement(c.FileHeaderInfo, xml.StartElement{Name: xml.Name{Local: "FileHeaderInfo"}}); err != nil {
    160 			return err
    161 		}
    162 	}
    163 
    164 	if c.RecordDelimiter != "" || c.recordDelimiterSet {
    165 		if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
    166 			return err
    167 		}
    168 	}
    169 
    170 	if c.FieldDelimiter != "" || c.fieldDelimiterSet {
    171 		if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil {
    172 			return err
    173 		}
    174 	}
    175 
    176 	if c.QuoteCharacter != "" || c.quoteCharacterSet {
    177 		if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil {
    178 			return err
    179 		}
    180 	}
    181 
    182 	if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet {
    183 		if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil {
    184 			return err
    185 		}
    186 	}
    187 
    188 	if c.Comments != "" || c.commentsSet {
    189 		if err := e.EncodeElement(c.Comments, xml.StartElement{Name: xml.Name{Local: "Comments"}}); err != nil {
    190 			return err
    191 		}
    192 	}
    193 
    194 	return e.EncodeToken(xml.EndElement{Name: start.Name})
    195 }
    196 
    197 // CSVOutputOptions csv output specific options
    198 type CSVOutputOptions struct {
    199 	QuoteFields    CSVQuoteFields
    200 	quoteFieldsSet bool
    201 
    202 	RecordDelimiter    string
    203 	recordDelimiterSet bool
    204 
    205 	FieldDelimiter    string
    206 	fieldDelimiterSet bool
    207 
    208 	QuoteCharacter    string
    209 	quoteCharacterSet bool
    210 
    211 	QuoteEscapeCharacter    string
    212 	quoteEscapeCharacterSet bool
    213 }
    214 
    215 // SetQuoteFields sets the quote field parameter in the CSV output options
    216 func (c *CSVOutputOptions) SetQuoteFields(val CSVQuoteFields) {
    217 	c.QuoteFields = val
    218 	c.quoteFieldsSet = true
    219 }
    220 
    221 // SetRecordDelimiter sets the record delimiter character in the CSV output options
    222 func (c *CSVOutputOptions) SetRecordDelimiter(val string) {
    223 	c.RecordDelimiter = val
    224 	c.recordDelimiterSet = true
    225 }
    226 
    227 // SetFieldDelimiter sets the field delimiter character in the CSV output options
    228 func (c *CSVOutputOptions) SetFieldDelimiter(val string) {
    229 	c.FieldDelimiter = val
    230 	c.fieldDelimiterSet = true
    231 }
    232 
    233 // SetQuoteCharacter sets the quote character in the CSV output options
    234 func (c *CSVOutputOptions) SetQuoteCharacter(val string) {
    235 	c.QuoteCharacter = val
    236 	c.quoteCharacterSet = true
    237 }
    238 
    239 // SetQuoteEscapeCharacter sets the quote escape character in the CSV output options
    240 func (c *CSVOutputOptions) SetQuoteEscapeCharacter(val string) {
    241 	c.QuoteEscapeCharacter = val
    242 	c.quoteEscapeCharacterSet = true
    243 }
    244 
    245 // MarshalXML - produces the xml representation of the CSVOutputOptions struct
    246 func (c CSVOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
    247 	if err := e.EncodeToken(start); err != nil {
    248 		return err
    249 	}
    250 
    251 	if c.QuoteFields != "" || c.quoteFieldsSet {
    252 		if err := e.EncodeElement(c.QuoteFields, xml.StartElement{Name: xml.Name{Local: "QuoteFields"}}); err != nil {
    253 			return err
    254 		}
    255 	}
    256 
    257 	if c.RecordDelimiter != "" || c.recordDelimiterSet {
    258 		if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
    259 			return err
    260 		}
    261 	}
    262 
    263 	if c.FieldDelimiter != "" || c.fieldDelimiterSet {
    264 		if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil {
    265 			return err
    266 		}
    267 	}
    268 
    269 	if c.QuoteCharacter != "" || c.quoteCharacterSet {
    270 		if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil {
    271 			return err
    272 		}
    273 	}
    274 
    275 	if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet {
    276 		if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil {
    277 			return err
    278 		}
    279 	}
    280 
    281 	return e.EncodeToken(xml.EndElement{Name: start.Name})
    282 }
    283 
    284 // JSONInputOptions json input specific options
    285 type JSONInputOptions struct {
    286 	Type    JSONType
    287 	typeSet bool
    288 }
    289 
    290 // SetType sets the JSON type in the JSON input options
    291 func (j *JSONInputOptions) SetType(typ JSONType) {
    292 	j.Type = typ
    293 	j.typeSet = true
    294 }
    295 
    296 // MarshalXML - produces the xml representation of the JSONInputOptions struct
    297 func (j JSONInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
    298 	if err := e.EncodeToken(start); err != nil {
    299 		return err
    300 	}
    301 
    302 	if j.Type != "" || j.typeSet {
    303 		if err := e.EncodeElement(j.Type, xml.StartElement{Name: xml.Name{Local: "Type"}}); err != nil {
    304 			return err
    305 		}
    306 	}
    307 
    308 	return e.EncodeToken(xml.EndElement{Name: start.Name})
    309 }
    310 
    311 // JSONOutputOptions - json output specific options
    312 type JSONOutputOptions struct {
    313 	RecordDelimiter    string
    314 	recordDelimiterSet bool
    315 }
    316 
    317 // SetRecordDelimiter sets the record delimiter in the JSON output options
    318 func (j *JSONOutputOptions) SetRecordDelimiter(val string) {
    319 	j.RecordDelimiter = val
    320 	j.recordDelimiterSet = true
    321 }
    322 
    323 // MarshalXML - produces the xml representation of the JSONOutputOptions struct
    324 func (j JSONOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
    325 	if err := e.EncodeToken(start); err != nil {
    326 		return err
    327 	}
    328 
    329 	if j.RecordDelimiter != "" || j.recordDelimiterSet {
    330 		if err := e.EncodeElement(j.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
    331 			return err
    332 		}
    333 	}
    334 
    335 	return e.EncodeToken(xml.EndElement{Name: start.Name})
    336 }
    337 
    338 // SelectObjectInputSerialization - input serialization parameters
    339 type SelectObjectInputSerialization struct {
    340 	CompressionType SelectCompressionType `xml:"CompressionType,omitempty"`
    341 	Parquet         *ParquetInputOptions  `xml:"Parquet,omitempty"`
    342 	CSV             *CSVInputOptions      `xml:"CSV,omitempty"`
    343 	JSON            *JSONInputOptions     `xml:"JSON,omitempty"`
    344 }
    345 
    346 // SelectObjectOutputSerialization - output serialization parameters.
    347 type SelectObjectOutputSerialization struct {
    348 	CSV  *CSVOutputOptions  `xml:"CSV,omitempty"`
    349 	JSON *JSONOutputOptions `xml:"JSON,omitempty"`
    350 }
    351 
    352 // SelectObjectOptions - represents the input select body
    353 type SelectObjectOptions struct {
    354 	XMLName              xml.Name           `xml:"SelectObjectContentRequest" json:"-"`
    355 	ServerSideEncryption encrypt.ServerSide `xml:"-"`
    356 	Expression           string
    357 	ExpressionType       QueryExpressionType
    358 	InputSerialization   SelectObjectInputSerialization
    359 	OutputSerialization  SelectObjectOutputSerialization
    360 	RequestProgress      struct {
    361 		Enabled bool
    362 	}
    363 }
    364 
    365 // Header returns the http.Header representation of the SelectObject options.
    366 func (o SelectObjectOptions) Header() http.Header {
    367 	headers := make(http.Header)
    368 	if o.ServerSideEncryption != nil && o.ServerSideEncryption.Type() == encrypt.SSEC {
    369 		o.ServerSideEncryption.Marshal(headers)
    370 	}
    371 	return headers
    372 }
    373 
    374 // SelectObjectType - is the parameter which defines what type of object the
    375 // operation is being performed on.
    376 type SelectObjectType string
    377 
    378 // Constants for input data types.
    379 const (
    380 	SelectObjectTypeCSV     SelectObjectType = "CSV"
    381 	SelectObjectTypeJSON    SelectObjectType = "JSON"
    382 	SelectObjectTypeParquet SelectObjectType = "Parquet"
    383 )
    384 
    385 // preludeInfo is used for keeping track of necessary information from the
    386 // prelude.
    387 type preludeInfo struct {
    388 	totalLen  uint32
    389 	headerLen uint32
    390 }
    391 
    392 // SelectResults is used for the streaming responses from the server.
    393 type SelectResults struct {
    394 	pipeReader *io.PipeReader
    395 	resp       *http.Response
    396 	stats      *StatsMessage
    397 	progress   *ProgressMessage
    398 }
    399 
    400 // ProgressMessage is a struct for progress xml message.
    401 type ProgressMessage struct {
    402 	XMLName xml.Name `xml:"Progress" json:"-"`
    403 	StatsMessage
    404 }
    405 
    406 // StatsMessage is a struct for stat xml message.
    407 type StatsMessage struct {
    408 	XMLName        xml.Name `xml:"Stats" json:"-"`
    409 	BytesScanned   int64
    410 	BytesProcessed int64
    411 	BytesReturned  int64
    412 }
    413 
    414 // messageType represents the type of message.
    415 type messageType string
    416 
    417 const (
    418 	errorMsg  messageType = "error"
    419 	commonMsg messageType = "event"
    420 )
    421 
    422 // eventType represents the type of event.
    423 type eventType string
    424 
    425 // list of event-types returned by Select API.
    426 const (
    427 	endEvent      eventType = "End"
    428 	recordsEvent  eventType = "Records"
    429 	progressEvent eventType = "Progress"
    430 	statsEvent    eventType = "Stats"
    431 )
    432 
    433 // contentType represents content type of event.
    434 type contentType string
    435 
    436 const (
    437 	xmlContent contentType = "text/xml"
    438 )
    439 
    440 // SelectObjectContent is a implementation of http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html AWS S3 API.
    441 func (c *Client) SelectObjectContent(ctx context.Context, bucketName, objectName string, opts SelectObjectOptions) (*SelectResults, error) {
    442 	// Input validation.
    443 	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
    444 		return nil, err
    445 	}
    446 	if err := s3utils.CheckValidObjectName(objectName); err != nil {
    447 		return nil, err
    448 	}
    449 
    450 	selectReqBytes, err := xml.Marshal(opts)
    451 	if err != nil {
    452 		return nil, err
    453 	}
    454 
    455 	urlValues := make(url.Values)
    456 	urlValues.Set("select", "")
    457 	urlValues.Set("select-type", "2")
    458 
    459 	// Execute POST on bucket/object.
    460 	resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{
    461 		bucketName:       bucketName,
    462 		objectName:       objectName,
    463 		queryValues:      urlValues,
    464 		customHeader:     opts.Header(),
    465 		contentMD5Base64: sumMD5Base64(selectReqBytes),
    466 		contentSHA256Hex: sum256Hex(selectReqBytes),
    467 		contentBody:      bytes.NewReader(selectReqBytes),
    468 		contentLength:    int64(len(selectReqBytes)),
    469 	})
    470 	if err != nil {
    471 		return nil, err
    472 	}
    473 
    474 	return NewSelectResults(resp, bucketName)
    475 }
    476 
    477 // NewSelectResults creates a Select Result parser that parses the response
    478 // and returns a Reader that will return parsed and assembled select output.
    479 func NewSelectResults(resp *http.Response, bucketName string) (*SelectResults, error) {
    480 	if resp.StatusCode != http.StatusOK {
    481 		return nil, httpRespToErrorResponse(resp, bucketName, "")
    482 	}
    483 
    484 	pipeReader, pipeWriter := io.Pipe()
    485 	streamer := &SelectResults{
    486 		resp:       resp,
    487 		stats:      &StatsMessage{},
    488 		progress:   &ProgressMessage{},
    489 		pipeReader: pipeReader,
    490 	}
    491 	streamer.start(pipeWriter)
    492 	return streamer, nil
    493 }
    494 
    495 // Close - closes the underlying response body and the stream reader.
    496 func (s *SelectResults) Close() error {
    497 	defer closeResponse(s.resp)
    498 	return s.pipeReader.Close()
    499 }
    500 
    501 // Read - is a reader compatible implementation for SelectObjectContent records.
    502 func (s *SelectResults) Read(b []byte) (n int, err error) {
    503 	return s.pipeReader.Read(b)
    504 }
    505 
    506 // Stats - information about a request's stats when processing is complete.
    507 func (s *SelectResults) Stats() *StatsMessage {
    508 	return s.stats
    509 }
    510 
    511 // Progress - information about the progress of a request.
    512 func (s *SelectResults) Progress() *ProgressMessage {
    513 	return s.progress
    514 }
    515 
    516 // start is the main function that decodes the large byte array into
    517 // several events that are sent through the eventstream.
    518 func (s *SelectResults) start(pipeWriter *io.PipeWriter) {
    519 	go func() {
    520 		for {
    521 			var prelude preludeInfo
    522 			headers := make(http.Header)
    523 			var err error
    524 
    525 			// Create CRC code
    526 			crc := crc32.New(crc32.IEEETable)
    527 			crcReader := io.TeeReader(s.resp.Body, crc)
    528 
    529 			// Extract the prelude(12 bytes) into a struct to extract relevant information.
    530 			prelude, err = processPrelude(crcReader, crc)
    531 			if err != nil {
    532 				pipeWriter.CloseWithError(err)
    533 				closeResponse(s.resp)
    534 				return
    535 			}
    536 
    537 			// Extract the headers(variable bytes) into a struct to extract relevant information
    538 			if prelude.headerLen > 0 {
    539 				if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil {
    540 					pipeWriter.CloseWithError(err)
    541 					closeResponse(s.resp)
    542 					return
    543 				}
    544 			}
    545 
    546 			// Get the actual payload length so that the appropriate amount of
    547 			// bytes can be read or parsed.
    548 			payloadLen := prelude.PayloadLen()
    549 
    550 			m := messageType(headers.Get("message-type"))
    551 
    552 			switch m {
    553 			case errorMsg:
    554 				pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\""))
    555 				closeResponse(s.resp)
    556 				return
    557 			case commonMsg:
    558 				// Get content-type of the payload.
    559 				c := contentType(headers.Get("content-type"))
    560 
    561 				// Get event type of the payload.
    562 				e := eventType(headers.Get("event-type"))
    563 
    564 				// Handle all supported events.
    565 				switch e {
    566 				case endEvent:
    567 					pipeWriter.Close()
    568 					closeResponse(s.resp)
    569 					return
    570 				case recordsEvent:
    571 					if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil {
    572 						pipeWriter.CloseWithError(err)
    573 						closeResponse(s.resp)
    574 						return
    575 					}
    576 				case progressEvent:
    577 					switch c {
    578 					case xmlContent:
    579 						if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil {
    580 							pipeWriter.CloseWithError(err)
    581 							closeResponse(s.resp)
    582 							return
    583 						}
    584 					default:
    585 						pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent))
    586 						closeResponse(s.resp)
    587 						return
    588 					}
    589 				case statsEvent:
    590 					switch c {
    591 					case xmlContent:
    592 						if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil {
    593 							pipeWriter.CloseWithError(err)
    594 							closeResponse(s.resp)
    595 							return
    596 						}
    597 					default:
    598 						pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent))
    599 						closeResponse(s.resp)
    600 						return
    601 					}
    602 				}
    603 			}
    604 
    605 			// Ensures that the full message's CRC is correct and
    606 			// that the message is not corrupted
    607 			if err := checkCRC(s.resp.Body, crc.Sum32()); err != nil {
    608 				pipeWriter.CloseWithError(err)
    609 				closeResponse(s.resp)
    610 				return
    611 			}
    612 
    613 		}
    614 	}()
    615 }
    616 
    617 // PayloadLen is a function that calculates the length of the payload.
    618 func (p preludeInfo) PayloadLen() int64 {
    619 	return int64(p.totalLen - p.headerLen - 16)
    620 }
    621 
    622 // processPrelude is the function that reads the 12 bytes of the prelude and
    623 // ensures the CRC is correct while also extracting relevant information into
    624 // the struct,
    625 func processPrelude(prelude io.Reader, crc hash.Hash32) (preludeInfo, error) {
    626 	var err error
    627 	pInfo := preludeInfo{}
    628 
    629 	// reads total length of the message (first 4 bytes)
    630 	pInfo.totalLen, err = extractUint32(prelude)
    631 	if err != nil {
    632 		return pInfo, err
    633 	}
    634 
    635 	// reads total header length of the message (2nd 4 bytes)
    636 	pInfo.headerLen, err = extractUint32(prelude)
    637 	if err != nil {
    638 		return pInfo, err
    639 	}
    640 
    641 	// checks that the CRC is correct (3rd 4 bytes)
    642 	preCRC := crc.Sum32()
    643 	if err := checkCRC(prelude, preCRC); err != nil {
    644 		return pInfo, err
    645 	}
    646 
    647 	return pInfo, nil
    648 }
    649 
    650 // extracts the relevant information from the Headers.
    651 func extractHeader(body io.Reader, myHeaders http.Header) error {
    652 	for {
    653 		// extracts the first part of the header,
    654 		headerTypeName, err := extractHeaderType(body)
    655 		if err != nil {
    656 			// Since end of file, we have read all of our headers
    657 			if err == io.EOF {
    658 				break
    659 			}
    660 			return err
    661 		}
    662 
    663 		// reads the 7 present in the header and ignores it.
    664 		extractUint8(body)
    665 
    666 		headerValueName, err := extractHeaderValue(body)
    667 		if err != nil {
    668 			return err
    669 		}
    670 
    671 		myHeaders.Set(headerTypeName, headerValueName)
    672 
    673 	}
    674 	return nil
    675 }
    676 
    677 // extractHeaderType extracts the first half of the header message, the header type.
    678 func extractHeaderType(body io.Reader) (string, error) {
    679 	// extracts 2 bit integer
    680 	headerNameLen, err := extractUint8(body)
    681 	if err != nil {
    682 		return "", err
    683 	}
    684 	// extracts the string with the appropriate number of bytes
    685 	headerName, err := extractString(body, int(headerNameLen))
    686 	if err != nil {
    687 		return "", err
    688 	}
    689 	return strings.TrimPrefix(headerName, ":"), nil
    690 }
    691 
    692 // extractsHeaderValue extracts the second half of the header message, the
    693 // header value
    694 func extractHeaderValue(body io.Reader) (string, error) {
    695 	bodyLen, err := extractUint16(body)
    696 	if err != nil {
    697 		return "", err
    698 	}
    699 	bodyName, err := extractString(body, int(bodyLen))
    700 	if err != nil {
    701 		return "", err
    702 	}
    703 	return bodyName, nil
    704 }
    705 
    706 // extracts a string from byte array of a particular number of bytes.
    707 func extractString(source io.Reader, lenBytes int) (string, error) {
    708 	myVal := make([]byte, lenBytes)
    709 	_, err := source.Read(myVal)
    710 	if err != nil {
    711 		return "", err
    712 	}
    713 	return string(myVal), nil
    714 }
    715 
    716 // extractUint32 extracts a 4 byte integer from the byte array.
    717 func extractUint32(r io.Reader) (uint32, error) {
    718 	buf := make([]byte, 4)
    719 	_, err := readFull(r, buf)
    720 	if err != nil {
    721 		return 0, err
    722 	}
    723 	return binary.BigEndian.Uint32(buf), nil
    724 }
    725 
    726 // extractUint16 extracts a 2 byte integer from the byte array.
    727 func extractUint16(r io.Reader) (uint16, error) {
    728 	buf := make([]byte, 2)
    729 	_, err := readFull(r, buf)
    730 	if err != nil {
    731 		return 0, err
    732 	}
    733 	return binary.BigEndian.Uint16(buf), nil
    734 }
    735 
    736 // extractUint8 extracts a 1 byte integer from the byte array.
    737 func extractUint8(r io.Reader) (uint8, error) {
    738 	buf := make([]byte, 1)
    739 	_, err := readFull(r, buf)
    740 	if err != nil {
    741 		return 0, err
    742 	}
    743 	return buf[0], nil
    744 }
    745 
    746 // checkCRC ensures that the CRC matches with the one from the reader.
    747 func checkCRC(r io.Reader, expect uint32) error {
    748 	msgCRC, err := extractUint32(r)
    749 	if err != nil {
    750 		return err
    751 	}
    752 
    753 	if msgCRC != expect {
    754 		return fmt.Errorf("Checksum Mismatch, MessageCRC of 0x%X does not equal expected CRC of 0x%X", msgCRC, expect)
    755 	}
    756 	return nil
    757 }