gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

rpc_util.go (29764B)


      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"bytes"
     23 	"compress/gzip"
     24 	"context"
     25 	"encoding/binary"
     26 	"fmt"
     27 	"io"
     28 	"math"
     29 	"strings"
     30 	"sync"
     31 	"time"
     32 
     33 	"google.golang.org/grpc/codes"
     34 	"google.golang.org/grpc/credentials"
     35 	"google.golang.org/grpc/encoding"
     36 	"google.golang.org/grpc/encoding/proto"
     37 	"google.golang.org/grpc/internal/transport"
     38 	"google.golang.org/grpc/metadata"
     39 	"google.golang.org/grpc/peer"
     40 	"google.golang.org/grpc/stats"
     41 	"google.golang.org/grpc/status"
     42 )
     43 
     44 // Compressor defines the interface gRPC uses to compress a message.
     45 //
     46 // Deprecated: use package encoding.
     47 type Compressor interface {
     48 	// Do compresses p into w.
     49 	Do(w io.Writer, p []byte) error
     50 	// Type returns the compression algorithm the Compressor uses.
     51 	Type() string
     52 }
     53 
     54 type gzipCompressor struct {
     55 	pool sync.Pool
     56 }
     57 
     58 // NewGZIPCompressor creates a Compressor based on GZIP.
     59 //
     60 // Deprecated: use package encoding/gzip.
     61 func NewGZIPCompressor() Compressor {
     62 	c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
     63 	return c
     64 }
     65 
     66 // NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
     67 // of assuming DefaultCompression.
     68 //
     69 // The error returned will be nil if the level is valid.
     70 //
     71 // Deprecated: use package encoding/gzip.
     72 func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
     73 	if level < gzip.DefaultCompression || level > gzip.BestCompression {
     74 		return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
     75 	}
     76 	return &gzipCompressor{
     77 		pool: sync.Pool{
     78 			New: func() interface{} {
     79 				w, err := gzip.NewWriterLevel(io.Discard, level)
     80 				if err != nil {
     81 					panic(err)
     82 				}
     83 				return w
     84 			},
     85 		},
     86 	}, nil
     87 }
     88 
     89 func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
     90 	z := c.pool.Get().(*gzip.Writer)
     91 	defer c.pool.Put(z)
     92 	z.Reset(w)
     93 	if _, err := z.Write(p); err != nil {
     94 		return err
     95 	}
     96 	return z.Close()
     97 }
     98 
     99 func (c *gzipCompressor) Type() string {
    100 	return "gzip"
    101 }
    102 
    103 // Decompressor defines the interface gRPC uses to decompress a message.
    104 //
    105 // Deprecated: use package encoding.
    106 type Decompressor interface {
    107 	// Do reads the data from r and uncompress them.
    108 	Do(r io.Reader) ([]byte, error)
    109 	// Type returns the compression algorithm the Decompressor uses.
    110 	Type() string
    111 }
    112 
    113 type gzipDecompressor struct {
    114 	pool sync.Pool
    115 }
    116 
    117 // NewGZIPDecompressor creates a Decompressor based on GZIP.
    118 //
    119 // Deprecated: use package encoding/gzip.
    120 func NewGZIPDecompressor() Decompressor {
    121 	return &gzipDecompressor{}
    122 }
    123 
    124 func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
    125 	var z *gzip.Reader
    126 	switch maybeZ := d.pool.Get().(type) {
    127 	case nil:
    128 		newZ, err := gzip.NewReader(r)
    129 		if err != nil {
    130 			return nil, err
    131 		}
    132 		z = newZ
    133 	case *gzip.Reader:
    134 		z = maybeZ
    135 		if err := z.Reset(r); err != nil {
    136 			d.pool.Put(z)
    137 			return nil, err
    138 		}
    139 	}
    140 
    141 	defer func() {
    142 		z.Close()
    143 		d.pool.Put(z)
    144 	}()
    145 	return io.ReadAll(z)
    146 }
    147 
    148 func (d *gzipDecompressor) Type() string {
    149 	return "gzip"
    150 }
    151 
    152 // callInfo contains all related configuration and information about an RPC.
    153 type callInfo struct {
    154 	compressorType        string
    155 	failFast              bool
    156 	maxReceiveMessageSize *int
    157 	maxSendMessageSize    *int
    158 	creds                 credentials.PerRPCCredentials
    159 	contentSubtype        string
    160 	codec                 baseCodec
    161 	maxRetryRPCBufferSize int
    162 	onFinish              []func(err error)
    163 }
    164 
    165 func defaultCallInfo() *callInfo {
    166 	return &callInfo{
    167 		failFast:              true,
    168 		maxRetryRPCBufferSize: 256 * 1024, // 256KB
    169 	}
    170 }
    171 
    172 // CallOption configures a Call before it starts or extracts information from
    173 // a Call after it completes.
    174 type CallOption interface {
    175 	// before is called before the call is sent to any server.  If before
    176 	// returns a non-nil error, the RPC fails with that error.
    177 	before(*callInfo) error
    178 
    179 	// after is called after the call has completed.  after cannot return an
    180 	// error, so any failures should be reported via output parameters.
    181 	after(*callInfo, *csAttempt)
    182 }
    183 
    184 // EmptyCallOption does not alter the Call configuration.
    185 // It can be embedded in another structure to carry satellite data for use
    186 // by interceptors.
    187 type EmptyCallOption struct{}
    188 
    189 func (EmptyCallOption) before(*callInfo) error      { return nil }
    190 func (EmptyCallOption) after(*callInfo, *csAttempt) {}
    191 
    192 // Header returns a CallOptions that retrieves the header metadata
    193 // for a unary RPC.
    194 func Header(md *metadata.MD) CallOption {
    195 	return HeaderCallOption{HeaderAddr: md}
    196 }
    197 
    198 // HeaderCallOption is a CallOption for collecting response header metadata.
    199 // The metadata field will be populated *after* the RPC completes.
    200 //
    201 // # Experimental
    202 //
    203 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    204 // later release.
    205 type HeaderCallOption struct {
    206 	HeaderAddr *metadata.MD
    207 }
    208 
    209 func (o HeaderCallOption) before(c *callInfo) error { return nil }
    210 func (o HeaderCallOption) after(c *callInfo, attempt *csAttempt) {
    211 	*o.HeaderAddr, _ = attempt.s.Header()
    212 }
    213 
    214 // Trailer returns a CallOptions that retrieves the trailer metadata
    215 // for a unary RPC.
    216 func Trailer(md *metadata.MD) CallOption {
    217 	return TrailerCallOption{TrailerAddr: md}
    218 }
    219 
    220 // TrailerCallOption is a CallOption for collecting response trailer metadata.
    221 // The metadata field will be populated *after* the RPC completes.
    222 //
    223 // # Experimental
    224 //
    225 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    226 // later release.
    227 type TrailerCallOption struct {
    228 	TrailerAddr *metadata.MD
    229 }
    230 
    231 func (o TrailerCallOption) before(c *callInfo) error { return nil }
    232 func (o TrailerCallOption) after(c *callInfo, attempt *csAttempt) {
    233 	*o.TrailerAddr = attempt.s.Trailer()
    234 }
    235 
    236 // Peer returns a CallOption that retrieves peer information for a unary RPC.
    237 // The peer field will be populated *after* the RPC completes.
    238 func Peer(p *peer.Peer) CallOption {
    239 	return PeerCallOption{PeerAddr: p}
    240 }
    241 
    242 // PeerCallOption is a CallOption for collecting the identity of the remote
    243 // peer. The peer field will be populated *after* the RPC completes.
    244 //
    245 // # Experimental
    246 //
    247 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    248 // later release.
    249 type PeerCallOption struct {
    250 	PeerAddr *peer.Peer
    251 }
    252 
    253 func (o PeerCallOption) before(c *callInfo) error { return nil }
    254 func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) {
    255 	if x, ok := peer.FromContext(attempt.s.Context()); ok {
    256 		*o.PeerAddr = *x
    257 	}
    258 }
    259 
    260 // WaitForReady configures the action to take when an RPC is attempted on broken
    261 // connections or unreachable servers. If waitForReady is false and the
    262 // connection is in the TRANSIENT_FAILURE state, the RPC will fail
    263 // immediately. Otherwise, the RPC client will block the call until a
    264 // connection is available (or the call is canceled or times out) and will
    265 // retry the call if it fails due to a transient error.  gRPC will not retry if
    266 // data was written to the wire unless the server indicates it did not process
    267 // the data.  Please refer to
    268 // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
    269 //
    270 // By default, RPCs don't "wait for ready".
    271 func WaitForReady(waitForReady bool) CallOption {
    272 	return FailFastCallOption{FailFast: !waitForReady}
    273 }
    274 
    275 // FailFast is the opposite of WaitForReady.
    276 //
    277 // Deprecated: use WaitForReady.
    278 func FailFast(failFast bool) CallOption {
    279 	return FailFastCallOption{FailFast: failFast}
    280 }
    281 
    282 // FailFastCallOption is a CallOption for indicating whether an RPC should fail
    283 // fast or not.
    284 //
    285 // # Experimental
    286 //
    287 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    288 // later release.
    289 type FailFastCallOption struct {
    290 	FailFast bool
    291 }
    292 
    293 func (o FailFastCallOption) before(c *callInfo) error {
    294 	c.failFast = o.FailFast
    295 	return nil
    296 }
    297 func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}
    298 
    299 // OnFinish returns a CallOption that configures a callback to be called when
    300 // the call completes. The error passed to the callback is the status of the
    301 // RPC, and may be nil. The onFinish callback provided will only be called once
    302 // by gRPC. This is mainly used to be used by streaming interceptors, to be
    303 // notified when the RPC completes along with information about the status of
    304 // the RPC.
    305 //
    306 // # Experimental
    307 //
    308 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    309 // later release.
    310 func OnFinish(onFinish func(err error)) CallOption {
    311 	return OnFinishCallOption{
    312 		OnFinish: onFinish,
    313 	}
    314 }
    315 
    316 // OnFinishCallOption is CallOption that indicates a callback to be called when
    317 // the call completes.
    318 //
    319 // # Experimental
    320 //
    321 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    322 // later release.
    323 type OnFinishCallOption struct {
    324 	OnFinish func(error)
    325 }
    326 
    327 func (o OnFinishCallOption) before(c *callInfo) error {
    328 	c.onFinish = append(c.onFinish, o.OnFinish)
    329 	return nil
    330 }
    331 
    332 func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {}
    333 
    334 // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
    335 // in bytes the client can receive. If this is not set, gRPC uses the default
    336 // 4MB.
    337 func MaxCallRecvMsgSize(bytes int) CallOption {
    338 	return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: bytes}
    339 }
    340 
    341 // MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
    342 // size in bytes the client can receive.
    343 //
    344 // # Experimental
    345 //
    346 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    347 // later release.
    348 type MaxRecvMsgSizeCallOption struct {
    349 	MaxRecvMsgSize int
    350 }
    351 
    352 func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
    353 	c.maxReceiveMessageSize = &o.MaxRecvMsgSize
    354 	return nil
    355 }
    356 func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
    357 
    358 // MaxCallSendMsgSize returns a CallOption which sets the maximum message size
    359 // in bytes the client can send. If this is not set, gRPC uses the default
    360 // `math.MaxInt32`.
    361 func MaxCallSendMsgSize(bytes int) CallOption {
    362 	return MaxSendMsgSizeCallOption{MaxSendMsgSize: bytes}
    363 }
    364 
    365 // MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
    366 // size in bytes the client can send.
    367 //
    368 // # Experimental
    369 //
    370 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    371 // later release.
    372 type MaxSendMsgSizeCallOption struct {
    373 	MaxSendMsgSize int
    374 }
    375 
    376 func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
    377 	c.maxSendMessageSize = &o.MaxSendMsgSize
    378 	return nil
    379 }
    380 func (o MaxSendMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
    381 
    382 // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
    383 // for a call.
    384 func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
    385 	return PerRPCCredsCallOption{Creds: creds}
    386 }
    387 
    388 // PerRPCCredsCallOption is a CallOption that indicates the per-RPC
    389 // credentials to use for the call.
    390 //
    391 // # Experimental
    392 //
    393 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    394 // later release.
    395 type PerRPCCredsCallOption struct {
    396 	Creds credentials.PerRPCCredentials
    397 }
    398 
    399 func (o PerRPCCredsCallOption) before(c *callInfo) error {
    400 	c.creds = o.Creds
    401 	return nil
    402 }
    403 func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {}
    404 
    405 // UseCompressor returns a CallOption which sets the compressor used when
    406 // sending the request.  If WithCompressor is also set, UseCompressor has
    407 // higher priority.
    408 //
    409 // # Experimental
    410 //
    411 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    412 // later release.
    413 func UseCompressor(name string) CallOption {
    414 	return CompressorCallOption{CompressorType: name}
    415 }
    416 
    417 // CompressorCallOption is a CallOption that indicates the compressor to use.
    418 //
    419 // # Experimental
    420 //
    421 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    422 // later release.
    423 type CompressorCallOption struct {
    424 	CompressorType string
    425 }
    426 
    427 func (o CompressorCallOption) before(c *callInfo) error {
    428 	c.compressorType = o.CompressorType
    429 	return nil
    430 }
    431 func (o CompressorCallOption) after(c *callInfo, attempt *csAttempt) {}
    432 
    433 // CallContentSubtype returns a CallOption that will set the content-subtype
    434 // for a call. For example, if content-subtype is "json", the Content-Type over
    435 // the wire will be "application/grpc+json". The content-subtype is converted
    436 // to lowercase before being included in Content-Type. See Content-Type on
    437 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    438 // more details.
    439 //
    440 // If ForceCodec is not also used, the content-subtype will be used to look up
    441 // the Codec to use in the registry controlled by RegisterCodec. See the
    442 // documentation on RegisterCodec for details on registration. The lookup of
    443 // content-subtype is case-insensitive. If no such Codec is found, the call
    444 // will result in an error with code codes.Internal.
    445 //
    446 // If ForceCodec is also used, that Codec will be used for all request and
    447 // response messages, with the content-subtype set to the given contentSubtype
    448 // here for requests.
    449 func CallContentSubtype(contentSubtype string) CallOption {
    450 	return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
    451 }
    452 
    453 // ContentSubtypeCallOption is a CallOption that indicates the content-subtype
    454 // used for marshaling messages.
    455 //
    456 // # Experimental
    457 //
    458 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    459 // later release.
    460 type ContentSubtypeCallOption struct {
    461 	ContentSubtype string
    462 }
    463 
    464 func (o ContentSubtypeCallOption) before(c *callInfo) error {
    465 	c.contentSubtype = o.ContentSubtype
    466 	return nil
    467 }
    468 func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
    469 
    470 // ForceCodec returns a CallOption that will set codec to be used for all
    471 // request and response messages for a call. The result of calling Name() will
    472 // be used as the content-subtype after converting to lowercase, unless
    473 // CallContentSubtype is also used.
    474 //
    475 // See Content-Type on
    476 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    477 // more details. Also see the documentation on RegisterCodec and
    478 // CallContentSubtype for more details on the interaction between Codec and
    479 // content-subtype.
    480 //
    481 // This function is provided for advanced users; prefer to use only
    482 // CallContentSubtype to select a registered codec instead.
    483 //
    484 // # Experimental
    485 //
    486 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    487 // later release.
    488 func ForceCodec(codec encoding.Codec) CallOption {
    489 	return ForceCodecCallOption{Codec: codec}
    490 }
    491 
    492 // ForceCodecCallOption is a CallOption that indicates the codec used for
    493 // marshaling messages.
    494 //
    495 // # Experimental
    496 //
    497 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    498 // later release.
    499 type ForceCodecCallOption struct {
    500 	Codec encoding.Codec
    501 }
    502 
    503 func (o ForceCodecCallOption) before(c *callInfo) error {
    504 	c.codec = o.Codec
    505 	return nil
    506 }
    507 func (o ForceCodecCallOption) after(c *callInfo, attempt *csAttempt) {}
    508 
    509 // CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of
    510 // an encoding.Codec.
    511 //
    512 // Deprecated: use ForceCodec instead.
    513 func CallCustomCodec(codec Codec) CallOption {
    514 	return CustomCodecCallOption{Codec: codec}
    515 }
    516 
    517 // CustomCodecCallOption is a CallOption that indicates the codec used for
    518 // marshaling messages.
    519 //
    520 // # Experimental
    521 //
    522 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    523 // later release.
    524 type CustomCodecCallOption struct {
    525 	Codec Codec
    526 }
    527 
    528 func (o CustomCodecCallOption) before(c *callInfo) error {
    529 	c.codec = o.Codec
    530 	return nil
    531 }
    532 func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {}
    533 
    534 // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
    535 // used for buffering this RPC's requests for retry purposes.
    536 //
    537 // # Experimental
    538 //
    539 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    540 // later release.
    541 func MaxRetryRPCBufferSize(bytes int) CallOption {
    542 	return MaxRetryRPCBufferSizeCallOption{bytes}
    543 }
    544 
    545 // MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
    546 // memory to be used for caching this RPC for retry purposes.
    547 //
    548 // # Experimental
    549 //
    550 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    551 // later release.
    552 type MaxRetryRPCBufferSizeCallOption struct {
    553 	MaxRetryRPCBufferSize int
    554 }
    555 
    556 func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
    557 	c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
    558 	return nil
    559 }
    560 func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
    561 
    562 // The format of the payload: compressed or not?
    563 type payloadFormat uint8
    564 
    565 const (
    566 	compressionNone payloadFormat = 0 // no compression
    567 	compressionMade payloadFormat = 1 // compressed
    568 )
    569 
    570 // parser reads complete gRPC messages from the underlying reader.
    571 type parser struct {
    572 	// r is the underlying reader.
    573 	// See the comment on recvMsg for the permissible
    574 	// error types.
    575 	r io.Reader
    576 
    577 	// The header of a gRPC message. Find more detail at
    578 	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
    579 	header [5]byte
    580 }
    581 
    582 // recvMsg reads a complete gRPC message from the stream.
    583 //
    584 // It returns the message and its payload (compression/encoding)
    585 // format. The caller owns the returned msg memory.
    586 //
    587 // If there is an error, possible values are:
    588 //   - io.EOF, when no messages remain
    589 //   - io.ErrUnexpectedEOF
    590 //   - of type transport.ConnectionError
    591 //   - an error from the status package
    592 //
    593 // No other error values or types must be returned, which also means
    594 // that the underlying io.Reader must not return an incompatible
    595 // error.
    596 func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
    597 	if _, err := p.r.Read(p.header[:]); err != nil {
    598 		return 0, nil, err
    599 	}
    600 
    601 	pf = payloadFormat(p.header[0])
    602 	length := binary.BigEndian.Uint32(p.header[1:])
    603 
    604 	if length == 0 {
    605 		return pf, nil, nil
    606 	}
    607 	if int64(length) > int64(maxInt) {
    608 		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
    609 	}
    610 	if int(length) > maxReceiveMessageSize {
    611 		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
    612 	}
    613 	// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
    614 	// of making it for each message:
    615 	msg = make([]byte, int(length))
    616 	if _, err := p.r.Read(msg); err != nil {
    617 		if err == io.EOF {
    618 			err = io.ErrUnexpectedEOF
    619 		}
    620 		return 0, nil, err
    621 	}
    622 	return pf, msg, nil
    623 }
    624 
    625 // encode serializes msg and returns a buffer containing the message, or an
    626 // error if it is too large to be transmitted by grpc.  If msg is nil, it
    627 // generates an empty message.
    628 func encode(c baseCodec, msg interface{}) ([]byte, error) {
    629 	if msg == nil { // NOTE: typed nils will not be caught by this check
    630 		return nil, nil
    631 	}
    632 	b, err := c.Marshal(msg)
    633 	if err != nil {
    634 		return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
    635 	}
    636 	if uint(len(b)) > math.MaxUint32 {
    637 		return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
    638 	}
    639 	return b, nil
    640 }
    641 
    642 // compress returns the input bytes compressed by compressor or cp.  If both
    643 // compressors are nil, returns nil.
    644 //
    645 // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
    646 func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
    647 	if compressor == nil && cp == nil {
    648 		return nil, nil
    649 	}
    650 	wrapErr := func(err error) error {
    651 		return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
    652 	}
    653 	cbuf := &bytes.Buffer{}
    654 	if compressor != nil {
    655 		z, err := compressor.Compress(cbuf)
    656 		if err != nil {
    657 			return nil, wrapErr(err)
    658 		}
    659 		if _, err := z.Write(in); err != nil {
    660 			return nil, wrapErr(err)
    661 		}
    662 		if err := z.Close(); err != nil {
    663 			return nil, wrapErr(err)
    664 		}
    665 	} else {
    666 		if err := cp.Do(cbuf, in); err != nil {
    667 			return nil, wrapErr(err)
    668 		}
    669 	}
    670 	return cbuf.Bytes(), nil
    671 }
    672 
    673 const (
    674 	payloadLen = 1
    675 	sizeLen    = 4
    676 	headerLen  = payloadLen + sizeLen
    677 )
    678 
    679 // msgHeader returns a 5-byte header for the message being transmitted and the
    680 // payload, which is compData if non-nil or data otherwise.
    681 func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
    682 	hdr = make([]byte, headerLen)
    683 	if compData != nil {
    684 		hdr[0] = byte(compressionMade)
    685 		data = compData
    686 	} else {
    687 		hdr[0] = byte(compressionNone)
    688 	}
    689 
    690 	// Write length of payload into buf
    691 	binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data)))
    692 	return hdr, data
    693 }
    694 
    695 func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload {
    696 	return &stats.OutPayload{
    697 		Client:           client,
    698 		Payload:          msg,
    699 		Data:             data,
    700 		Length:           len(data),
    701 		WireLength:       len(payload) + headerLen,
    702 		CompressedLength: len(payload),
    703 		SentTime:         t,
    704 	}
    705 }
    706 
    707 func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
    708 	switch pf {
    709 	case compressionNone:
    710 	case compressionMade:
    711 		if recvCompress == "" || recvCompress == encoding.Identity {
    712 			return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
    713 		}
    714 		if !haveCompressor {
    715 			return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
    716 		}
    717 	default:
    718 		return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
    719 	}
    720 	return nil
    721 }
    722 
    723 type payloadInfo struct {
    724 	compressedLength  int // The compressed length got from wire.
    725 	uncompressedBytes []byte
    726 }
    727 
    728 func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
    729 	pf, d, err := p.recvMsg(maxReceiveMessageSize)
    730 	if err != nil {
    731 		return nil, err
    732 	}
    733 	if payInfo != nil {
    734 		payInfo.compressedLength = len(d)
    735 	}
    736 
    737 	if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
    738 		return nil, st.Err()
    739 	}
    740 
    741 	var size int
    742 	if pf == compressionMade {
    743 		// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
    744 		// use this decompressor as the default.
    745 		if dc != nil {
    746 			d, err = dc.Do(bytes.NewReader(d))
    747 			size = len(d)
    748 		} else {
    749 			d, size, err = decompress(compressor, d, maxReceiveMessageSize)
    750 		}
    751 		if err != nil {
    752 			return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
    753 		}
    754 		if size > maxReceiveMessageSize {
    755 			// TODO: Revisit the error code. Currently keep it consistent with java
    756 			// implementation.
    757 			return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
    758 		}
    759 	}
    760 	return d, nil
    761 }
    762 
    763 // Using compressor, decompress d, returning data and size.
    764 // Optionally, if data will be over maxReceiveMessageSize, just return the size.
    765 func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
    766 	dcReader, err := compressor.Decompress(bytes.NewReader(d))
    767 	if err != nil {
    768 		return nil, 0, err
    769 	}
    770 	if sizer, ok := compressor.(interface {
    771 		DecompressedSize(compressedBytes []byte) int
    772 	}); ok {
    773 		if size := sizer.DecompressedSize(d); size >= 0 {
    774 			if size > maxReceiveMessageSize {
    775 				return nil, size, nil
    776 			}
    777 			// size is used as an estimate to size the buffer, but we
    778 			// will read more data if available.
    779 			// +MinRead so ReadFrom will not reallocate if size is correct.
    780 			buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
    781 			bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
    782 			return buf.Bytes(), int(bytesRead), err
    783 		}
    784 	}
    785 	// Read from LimitReader with limit max+1. So if the underlying
    786 	// reader is over limit, the result will be bigger than max.
    787 	d, err = io.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
    788 	return d, len(d), err
    789 }
    790 
    791 // For the two compressor parameters, both should not be set, but if they are,
    792 // dc takes precedence over compressor.
    793 // TODO(dfawley): wrap the old compressor/decompressor using the new API?
    794 func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
    795 	d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
    796 	if err != nil {
    797 		return err
    798 	}
    799 	if err := c.Unmarshal(d, m); err != nil {
    800 		return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
    801 	}
    802 	if payInfo != nil {
    803 		payInfo.uncompressedBytes = d
    804 	}
    805 	return nil
    806 }
    807 
    808 // Information about RPC
    809 type rpcInfo struct {
    810 	failfast      bool
    811 	preloaderInfo *compressorInfo
    812 }
    813 
    814 // Information about Preloader
    815 // Responsible for storing codec, and compressors
    816 // If stream (s) has  context s.Context which stores rpcInfo that has non nil
    817 // pointers to codec, and compressors, then we can use preparedMsg for Async message prep
    818 // and reuse marshalled bytes
    819 type compressorInfo struct {
    820 	codec baseCodec
    821 	cp    Compressor
    822 	comp  encoding.Compressor
    823 }
    824 
    825 type rpcInfoContextKey struct{}
    826 
    827 func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
    828 	return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
    829 		failfast: failfast,
    830 		preloaderInfo: &compressorInfo{
    831 			codec: codec,
    832 			cp:    cp,
    833 			comp:  comp,
    834 		},
    835 	})
    836 }
    837 
    838 func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
    839 	s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo)
    840 	return
    841 }
    842 
    843 // Code returns the error code for err if it was produced by the rpc system.
    844 // Otherwise, it returns codes.Unknown.
    845 //
    846 // Deprecated: use status.Code instead.
    847 func Code(err error) codes.Code {
    848 	return status.Code(err)
    849 }
    850 
    851 // ErrorDesc returns the error description of err if it was produced by the rpc system.
    852 // Otherwise, it returns err.Error() or empty string when err is nil.
    853 //
    854 // Deprecated: use status.Convert and Message method instead.
    855 func ErrorDesc(err error) string {
    856 	return status.Convert(err).Message()
    857 }
    858 
    859 // Errorf returns an error containing an error code and a description;
    860 // Errorf returns nil if c is OK.
    861 //
    862 // Deprecated: use status.Errorf instead.
    863 func Errorf(c codes.Code, format string, a ...interface{}) error {
    864 	return status.Errorf(c, format, a...)
    865 }
    866 
    867 // toRPCErr converts an error into an error from the status package.
    868 func toRPCErr(err error) error {
    869 	switch err {
    870 	case nil, io.EOF:
    871 		return err
    872 	case context.DeadlineExceeded:
    873 		return status.Error(codes.DeadlineExceeded, err.Error())
    874 	case context.Canceled:
    875 		return status.Error(codes.Canceled, err.Error())
    876 	case io.ErrUnexpectedEOF:
    877 		return status.Error(codes.Internal, err.Error())
    878 	}
    879 
    880 	switch e := err.(type) {
    881 	case transport.ConnectionError:
    882 		return status.Error(codes.Unavailable, e.Desc)
    883 	case *transport.NewStreamError:
    884 		return toRPCErr(e.Err)
    885 	}
    886 
    887 	if _, ok := status.FromError(err); ok {
    888 		return err
    889 	}
    890 
    891 	return status.Error(codes.Unknown, err.Error())
    892 }
    893 
    894 // setCallInfoCodec should only be called after CallOptions have been applied.
    895 func setCallInfoCodec(c *callInfo) error {
    896 	if c.codec != nil {
    897 		// codec was already set by a CallOption; use it, but set the content
    898 		// subtype if it is not set.
    899 		if c.contentSubtype == "" {
    900 			// c.codec is a baseCodec to hide the difference between grpc.Codec and
    901 			// encoding.Codec (Name vs. String method name).  We only support
    902 			// setting content subtype from encoding.Codec to avoid a behavior
    903 			// change with the deprecated version.
    904 			if ec, ok := c.codec.(encoding.Codec); ok {
    905 				c.contentSubtype = strings.ToLower(ec.Name())
    906 			}
    907 		}
    908 		return nil
    909 	}
    910 
    911 	if c.contentSubtype == "" {
    912 		// No codec specified in CallOptions; use proto by default.
    913 		c.codec = encoding.GetCodec(proto.Name)
    914 		return nil
    915 	}
    916 
    917 	// c.contentSubtype is already lowercased in CallContentSubtype
    918 	c.codec = encoding.GetCodec(c.contentSubtype)
    919 	if c.codec == nil {
    920 		return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
    921 	}
    922 	return nil
    923 }
    924 
    925 // channelzData is used to store channelz related data for ClientConn, addrConn and Server.
    926 // These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
    927 // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
    928 // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
    929 type channelzData struct {
    930 	callsStarted   int64
    931 	callsFailed    int64
    932 	callsSucceeded int64
    933 	// lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of
    934 	// time.Time since it's more costly to atomically update time.Time variable than int64 variable.
    935 	lastCallStartedTime int64
    936 }
    937 
    938 // The SupportPackageIsVersion variables are referenced from generated protocol
    939 // buffer files to ensure compatibility with the gRPC version used.  The latest
    940 // support package version is 7.
    941 //
    942 // Older versions are kept for compatibility.
    943 //
    944 // These constants should not be referenced from any other code.
    945 const (
    946 	SupportPackageIsVersion3 = true
    947 	SupportPackageIsVersion4 = true
    948 	SupportPackageIsVersion5 = true
    949 	SupportPackageIsVersion6 = true
    950 	SupportPackageIsVersion7 = true
    951 )
    952 
    953 const grpcUA = "grpc-go/" + Version