stream.go (53778B)
1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "context" 23 "errors" 24 "io" 25 "math" 26 "strconv" 27 "sync" 28 "time" 29 30 "golang.org/x/net/trace" 31 "google.golang.org/grpc/balancer" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/encoding" 34 "google.golang.org/grpc/internal/balancerload" 35 "google.golang.org/grpc/internal/binarylog" 36 "google.golang.org/grpc/internal/channelz" 37 "google.golang.org/grpc/internal/grpcrand" 38 "google.golang.org/grpc/internal/grpcutil" 39 imetadata "google.golang.org/grpc/internal/metadata" 40 iresolver "google.golang.org/grpc/internal/resolver" 41 "google.golang.org/grpc/internal/serviceconfig" 42 istatus "google.golang.org/grpc/internal/status" 43 "google.golang.org/grpc/internal/transport" 44 "google.golang.org/grpc/metadata" 45 "google.golang.org/grpc/peer" 46 "google.golang.org/grpc/stats" 47 "google.golang.org/grpc/status" 48 ) 49 50 // StreamHandler defines the handler called by gRPC server to complete the 51 // execution of a streaming RPC. 52 // 53 // If a StreamHandler returns an error, it should either be produced by the 54 // status package, or be one of the context errors. Otherwise, gRPC will use 55 // codes.Unknown as the status code and err.Error() as the status message of the 56 // RPC. 57 type StreamHandler func(srv interface{}, stream ServerStream) error 58 59 // StreamDesc represents a streaming RPC service's method specification. Used 60 // on the server when registering services and on the client when initiating 61 // new streams. 62 type StreamDesc struct { 63 // StreamName and Handler are only used when registering handlers on a 64 // server. 65 StreamName string // the name of the method excluding the service 66 Handler StreamHandler // the handler called for the method 67 68 // ServerStreams and ClientStreams are used for registering handlers on a 69 // server as well as defining RPC behavior when passed to NewClientStream 70 // and ClientConn.NewStream. At least one must be true. 71 ServerStreams bool // indicates the server can perform streaming sends 72 ClientStreams bool // indicates the client can perform streaming sends 73 } 74 75 // Stream defines the common interface a client or server stream has to satisfy. 76 // 77 // Deprecated: See ClientStream and ServerStream documentation instead. 78 type Stream interface { 79 // Deprecated: See ClientStream and ServerStream documentation instead. 80 Context() context.Context 81 // Deprecated: See ClientStream and ServerStream documentation instead. 82 SendMsg(m interface{}) error 83 // Deprecated: See ClientStream and ServerStream documentation instead. 84 RecvMsg(m interface{}) error 85 } 86 87 // ClientStream defines the client-side behavior of a streaming RPC. 88 // 89 // All errors returned from ClientStream methods are compatible with the 90 // status package. 91 type ClientStream interface { 92 // Header returns the header metadata received from the server if there 93 // is any. It blocks if the metadata is not ready to read. 94 Header() (metadata.MD, error) 95 // Trailer returns the trailer metadata from the server, if there is any. 96 // It must only be called after stream.CloseAndRecv has returned, or 97 // stream.Recv has returned a non-nil error (including io.EOF). 98 Trailer() metadata.MD 99 // CloseSend closes the send direction of the stream. It closes the stream 100 // when non-nil error is met. It is also not safe to call CloseSend 101 // concurrently with SendMsg. 102 CloseSend() error 103 // Context returns the context for this stream. 104 // 105 // It should not be called until after Header or RecvMsg has returned. Once 106 // called, subsequent client-side retries are disabled. 107 Context() context.Context 108 // SendMsg is generally called by generated code. On error, SendMsg aborts 109 // the stream. If the error was generated by the client, the status is 110 // returned directly; otherwise, io.EOF is returned and the status of 111 // the stream may be discovered using RecvMsg. 112 // 113 // SendMsg blocks until: 114 // - There is sufficient flow control to schedule m with the transport, or 115 // - The stream is done, or 116 // - The stream breaks. 117 // 118 // SendMsg does not wait until the message is received by the server. An 119 // untimely stream closure may result in lost messages. To ensure delivery, 120 // users should ensure the RPC completed successfully using RecvMsg. 121 // 122 // It is safe to have a goroutine calling SendMsg and another goroutine 123 // calling RecvMsg on the same stream at the same time, but it is not safe 124 // to call SendMsg on the same stream in different goroutines. It is also 125 // not safe to call CloseSend concurrently with SendMsg. 126 SendMsg(m interface{}) error 127 // RecvMsg blocks until it receives a message into m or the stream is 128 // done. It returns io.EOF when the stream completes successfully. On 129 // any other error, the stream is aborted and the error contains the RPC 130 // status. 131 // 132 // It is safe to have a goroutine calling SendMsg and another goroutine 133 // calling RecvMsg on the same stream at the same time, but it is not 134 // safe to call RecvMsg on the same stream in different goroutines. 135 RecvMsg(m interface{}) error 136 } 137 138 // NewStream creates a new Stream for the client side. This is typically 139 // called by generated code. ctx is used for the lifetime of the stream. 140 // 141 // To ensure resources are not leaked due to the stream returned, one of the following 142 // actions must be performed: 143 // 144 // 1. Call Close on the ClientConn. 145 // 2. Cancel the context provided. 146 // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated 147 // client-streaming RPC, for instance, might use the helper function 148 // CloseAndRecv (note that CloseSend does not Recv, therefore is not 149 // guaranteed to release all resources). 150 // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. 151 // 152 // If none of the above happen, a goroutine and a context will be leaked, and grpc 153 // will not call the optionally-configured stats handler with a stats.End message. 154 func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 155 // allow interceptor to see all applicable call options, which means those 156 // configured as defaults from dial option as well as per-call options 157 opts = combine(cc.dopts.callOptions, opts) 158 159 if cc.dopts.streamInt != nil { 160 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 161 } 162 return newClientStream(ctx, desc, cc, method, opts...) 163 } 164 165 // NewClientStream is a wrapper for ClientConn.NewStream. 166 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 167 return cc.NewStream(ctx, desc, method, opts...) 168 } 169 170 func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 171 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { 172 // validate md 173 if err := imetadata.Validate(md); err != nil { 174 return nil, status.Error(codes.Internal, err.Error()) 175 } 176 // validate added 177 for _, kvs := range added { 178 for i := 0; i < len(kvs); i += 2 { 179 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil { 180 return nil, status.Error(codes.Internal, err.Error()) 181 } 182 } 183 } 184 } 185 if channelz.IsOn() { 186 cc.incrCallsStarted() 187 defer func() { 188 if err != nil { 189 cc.incrCallsFailed() 190 } 191 }() 192 } 193 // Provide an opportunity for the first RPC to see the first service config 194 // provided by the resolver. 195 if err := cc.waitForResolvedAddrs(ctx); err != nil { 196 return nil, err 197 } 198 199 var mc serviceconfig.MethodConfig 200 var onCommit func() 201 var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { 202 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) 203 } 204 205 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} 206 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) 207 if err != nil { 208 if st, ok := status.FromError(err); ok { 209 // Restrict the code to the list allowed by gRFC A54. 210 if istatus.IsRestrictedControlPlaneCode(st) { 211 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err) 212 } 213 return nil, err 214 } 215 return nil, toRPCErr(err) 216 } 217 218 if rpcConfig != nil { 219 if rpcConfig.Context != nil { 220 ctx = rpcConfig.Context 221 } 222 mc = rpcConfig.MethodConfig 223 onCommit = rpcConfig.OnCommitted 224 if rpcConfig.Interceptor != nil { 225 rpcInfo.Context = nil 226 ns := newStream 227 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { 228 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns) 229 if err != nil { 230 return nil, toRPCErr(err) 231 } 232 return cs, nil 233 } 234 } 235 } 236 237 return newStream(ctx, func() {}) 238 } 239 240 func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { 241 c := defaultCallInfo() 242 if mc.WaitForReady != nil { 243 c.failFast = !*mc.WaitForReady 244 } 245 246 // Possible context leak: 247 // The cancel function for the child context we create will only be called 248 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 249 // an error is generated by SendMsg. 250 // https://github.com/grpc/grpc-go/issues/1818. 251 var cancel context.CancelFunc 252 if mc.Timeout != nil && *mc.Timeout >= 0 { 253 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 254 } else { 255 ctx, cancel = context.WithCancel(ctx) 256 } 257 defer func() { 258 if err != nil { 259 cancel() 260 } 261 }() 262 263 for _, o := range opts { 264 if err := o.before(c); err != nil { 265 return nil, toRPCErr(err) 266 } 267 } 268 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 269 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 270 if err := setCallInfoCodec(c); err != nil { 271 return nil, err 272 } 273 274 callHdr := &transport.CallHdr{ 275 Host: cc.authority, 276 Method: method, 277 ContentSubtype: c.contentSubtype, 278 DoneFunc: doneFunc, 279 } 280 281 // Set our outgoing compression according to the UseCompressor CallOption, if 282 // set. In that case, also find the compressor from the encoding package. 283 // Otherwise, use the compressor configured by the WithCompressor DialOption, 284 // if set. 285 var cp Compressor 286 var comp encoding.Compressor 287 if ct := c.compressorType; ct != "" { 288 callHdr.SendCompress = ct 289 if ct != encoding.Identity { 290 comp = encoding.GetCompressor(ct) 291 if comp == nil { 292 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 293 } 294 } 295 } else if cc.dopts.cp != nil { 296 callHdr.SendCompress = cc.dopts.cp.Type() 297 cp = cc.dopts.cp 298 } 299 if c.creds != nil { 300 callHdr.Creds = c.creds 301 } 302 303 cs := &clientStream{ 304 callHdr: callHdr, 305 ctx: ctx, 306 methodConfig: &mc, 307 opts: opts, 308 callInfo: c, 309 cc: cc, 310 desc: desc, 311 codec: c.codec, 312 cp: cp, 313 comp: comp, 314 cancel: cancel, 315 firstAttempt: true, 316 onCommit: onCommit, 317 } 318 if !cc.dopts.disableRetry { 319 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) 320 } 321 if ml := binarylog.GetMethodLogger(method); ml != nil { 322 cs.binlogs = append(cs.binlogs, ml) 323 } 324 if cc.dopts.binaryLogger != nil { 325 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil { 326 cs.binlogs = append(cs.binlogs, ml) 327 } 328 } 329 330 // Pick the transport to use and create a new stream on the transport. 331 // Assign cs.attempt upon success. 332 op := func(a *csAttempt) error { 333 if err := a.getTransport(); err != nil { 334 return err 335 } 336 if err := a.newStream(); err != nil { 337 return err 338 } 339 // Because this operation is always called either here (while creating 340 // the clientStream) or by the retry code while locked when replaying 341 // the operation, it is safe to access cs.attempt directly. 342 cs.attempt = a 343 return nil 344 } 345 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { 346 return nil, err 347 } 348 349 if len(cs.binlogs) != 0 { 350 md, _ := metadata.FromOutgoingContext(ctx) 351 logEntry := &binarylog.ClientHeader{ 352 OnClientSide: true, 353 Header: md, 354 MethodName: method, 355 Authority: cs.cc.authority, 356 } 357 if deadline, ok := ctx.Deadline(); ok { 358 logEntry.Timeout = time.Until(deadline) 359 if logEntry.Timeout < 0 { 360 logEntry.Timeout = 0 361 } 362 } 363 for _, binlog := range cs.binlogs { 364 binlog.Log(cs.ctx, logEntry) 365 } 366 } 367 368 if desc != unaryStreamDesc { 369 // Listen on cc and stream contexts to cleanup when the user closes the 370 // ClientConn or cancels the stream context. In all other cases, an error 371 // should already be injected into the recv buffer by the transport, which 372 // the client will eventually receive, and then we will cancel the stream's 373 // context in clientStream.finish. 374 go func() { 375 select { 376 case <-cc.ctx.Done(): 377 cs.finish(ErrClientConnClosing) 378 case <-ctx.Done(): 379 cs.finish(toRPCErr(ctx.Err())) 380 } 381 }() 382 } 383 return cs, nil 384 } 385 386 // newAttemptLocked creates a new csAttempt without a transport or stream. 387 func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) { 388 if err := cs.ctx.Err(); err != nil { 389 return nil, toRPCErr(err) 390 } 391 if err := cs.cc.ctx.Err(); err != nil { 392 return nil, ErrClientConnClosing 393 } 394 395 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) 396 method := cs.callHdr.Method 397 var beginTime time.Time 398 shs := cs.cc.dopts.copts.StatsHandlers 399 for _, sh := range shs { 400 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast}) 401 beginTime = time.Now() 402 begin := &stats.Begin{ 403 Client: true, 404 BeginTime: beginTime, 405 FailFast: cs.callInfo.failFast, 406 IsClientStream: cs.desc.ClientStreams, 407 IsServerStream: cs.desc.ServerStreams, 408 IsTransparentRetryAttempt: isTransparent, 409 } 410 sh.HandleRPC(ctx, begin) 411 } 412 413 var trInfo *traceInfo 414 if EnableTracing { 415 trInfo = &traceInfo{ 416 tr: trace.New("grpc.Sent."+methodFamily(method), method), 417 firstLine: firstLine{ 418 client: true, 419 }, 420 } 421 if deadline, ok := ctx.Deadline(); ok { 422 trInfo.firstLine.deadline = time.Until(deadline) 423 } 424 trInfo.tr.LazyLog(&trInfo.firstLine, false) 425 ctx = trace.NewContext(ctx, trInfo.tr) 426 } 427 428 if cs.cc.parsedTarget.URL.Scheme == "xds" { 429 // Add extra metadata (metadata that will be added by transport) to context 430 // so the balancer can see them. 431 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs( 432 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype), 433 )) 434 } 435 436 return &csAttempt{ 437 ctx: ctx, 438 beginTime: beginTime, 439 cs: cs, 440 dc: cs.cc.dopts.dc, 441 statsHandlers: shs, 442 trInfo: trInfo, 443 }, nil 444 } 445 446 func (a *csAttempt) getTransport() error { 447 cs := a.cs 448 449 var err error 450 a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) 451 if err != nil { 452 if de, ok := err.(dropError); ok { 453 err = de.error 454 a.drop = true 455 } 456 return err 457 } 458 if a.trInfo != nil { 459 a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr()) 460 } 461 return nil 462 } 463 464 func (a *csAttempt) newStream() error { 465 cs := a.cs 466 cs.callHdr.PreviousAttempts = cs.numRetries 467 468 // Merge metadata stored in PickResult, if any, with existing call metadata. 469 // It is safe to overwrite the csAttempt's context here, since all state 470 // maintained in it are local to the attempt. When the attempt has to be 471 // retried, a new instance of csAttempt will be created. 472 if a.pickResult.Metatada != nil { 473 // We currently do not have a function it the metadata package which 474 // merges given metadata with existing metadata in a context. Existing 475 // function `AppendToOutgoingContext()` takes a variadic argument of key 476 // value pairs. 477 // 478 // TODO: Make it possible to retrieve key value pairs from metadata.MD 479 // in a form passable to AppendToOutgoingContext(), or create a version 480 // of AppendToOutgoingContext() that accepts a metadata.MD. 481 md, _ := metadata.FromOutgoingContext(a.ctx) 482 md = metadata.Join(md, a.pickResult.Metatada) 483 a.ctx = metadata.NewOutgoingContext(a.ctx, md) 484 } 485 486 s, err := a.t.NewStream(a.ctx, cs.callHdr) 487 if err != nil { 488 nse, ok := err.(*transport.NewStreamError) 489 if !ok { 490 // Unexpected. 491 return err 492 } 493 494 if nse.AllowTransparentRetry { 495 a.allowTransparentRetry = true 496 } 497 498 // Unwrap and convert error. 499 return toRPCErr(nse.Err) 500 } 501 a.s = s 502 a.p = &parser{r: s} 503 return nil 504 } 505 506 // clientStream implements a client side Stream. 507 type clientStream struct { 508 callHdr *transport.CallHdr 509 opts []CallOption 510 callInfo *callInfo 511 cc *ClientConn 512 desc *StreamDesc 513 514 codec baseCodec 515 cp Compressor 516 comp encoding.Compressor 517 518 cancel context.CancelFunc // cancels all attempts 519 520 sentLast bool // sent an end stream 521 522 methodConfig *MethodConfig 523 524 ctx context.Context // the application's context, wrapped by stats/tracing 525 526 retryThrottler *retryThrottler // The throttler active when the RPC began. 527 528 binlogs []binarylog.MethodLogger 529 // serverHeaderBinlogged is a boolean for whether server header has been 530 // logged. Server header will be logged when the first time one of those 531 // happens: stream.Header(), stream.Recv(). 532 // 533 // It's only read and used by Recv() and Header(), so it doesn't need to be 534 // synchronized. 535 serverHeaderBinlogged bool 536 537 mu sync.Mutex 538 firstAttempt bool // if true, transparent retry is valid 539 numRetries int // exclusive of transparent retry attempt(s) 540 numRetriesSincePushback int // retries since pushback; to reset backoff 541 finished bool // TODO: replace with atomic cmpxchg or sync.Once? 542 // attempt is the active client stream attempt. 543 // The only place where it is written is the newAttemptLocked method and this method never writes nil. 544 // So, attempt can be nil only inside newClientStream function when clientStream is first created. 545 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either 546 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, 547 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only 548 // place where we need to check if the attempt is nil. 549 attempt *csAttempt 550 // TODO(hedging): hedging will have multiple attempts simultaneously. 551 committed bool // active attempt committed for retry? 552 onCommit func() 553 buffer []func(a *csAttempt) error // operations to replay on retry 554 bufferSize int // current size of buffer 555 } 556 557 // csAttempt implements a single transport stream attempt within a 558 // clientStream. 559 type csAttempt struct { 560 ctx context.Context 561 cs *clientStream 562 t transport.ClientTransport 563 s *transport.Stream 564 p *parser 565 pickResult balancer.PickResult 566 567 finished bool 568 dc Decompressor 569 decomp encoding.Compressor 570 decompSet bool 571 572 mu sync.Mutex // guards trInfo.tr 573 // trInfo may be nil (if EnableTracing is false). 574 // trInfo.tr is set when created (if EnableTracing is true), 575 // and cleared when the finish method is called. 576 trInfo *traceInfo 577 578 statsHandlers []stats.Handler 579 beginTime time.Time 580 581 // set for newStream errors that may be transparently retried 582 allowTransparentRetry bool 583 // set for pick errors that are returned as a status 584 drop bool 585 } 586 587 func (cs *clientStream) commitAttemptLocked() { 588 if !cs.committed && cs.onCommit != nil { 589 cs.onCommit() 590 } 591 cs.committed = true 592 cs.buffer = nil 593 } 594 595 func (cs *clientStream) commitAttempt() { 596 cs.mu.Lock() 597 cs.commitAttemptLocked() 598 cs.mu.Unlock() 599 } 600 601 // shouldRetry returns nil if the RPC should be retried; otherwise it returns 602 // the error that should be returned by the operation. If the RPC should be 603 // retried, the bool indicates whether it is being retried transparently. 604 func (a *csAttempt) shouldRetry(err error) (bool, error) { 605 cs := a.cs 606 607 if cs.finished || cs.committed || a.drop { 608 // RPC is finished or committed or was dropped by the picker; cannot retry. 609 return false, err 610 } 611 if a.s == nil && a.allowTransparentRetry { 612 return true, nil 613 } 614 // Wait for the trailers. 615 unprocessed := false 616 if a.s != nil { 617 <-a.s.Done() 618 unprocessed = a.s.Unprocessed() 619 } 620 if cs.firstAttempt && unprocessed { 621 // First attempt, stream unprocessed: transparently retry. 622 return true, nil 623 } 624 if cs.cc.dopts.disableRetry { 625 return false, err 626 } 627 628 pushback := 0 629 hasPushback := false 630 if a.s != nil { 631 if !a.s.TrailersOnly() { 632 return false, err 633 } 634 635 // TODO(retry): Move down if the spec changes to not check server pushback 636 // before considering this a failure for throttling. 637 sps := a.s.Trailer()["grpc-retry-pushback-ms"] 638 if len(sps) == 1 { 639 var e error 640 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { 641 channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0]) 642 cs.retryThrottler.throttle() // This counts as a failure for throttling. 643 return false, err 644 } 645 hasPushback = true 646 } else if len(sps) > 1 { 647 channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps) 648 cs.retryThrottler.throttle() // This counts as a failure for throttling. 649 return false, err 650 } 651 } 652 653 var code codes.Code 654 if a.s != nil { 655 code = a.s.Status().Code() 656 } else { 657 code = status.Code(err) 658 } 659 660 rp := cs.methodConfig.RetryPolicy 661 if rp == nil || !rp.RetryableStatusCodes[code] { 662 return false, err 663 } 664 665 // Note: the ordering here is important; we count this as a failure 666 // only if the code matched a retryable code. 667 if cs.retryThrottler.throttle() { 668 return false, err 669 } 670 if cs.numRetries+1 >= rp.MaxAttempts { 671 return false, err 672 } 673 674 var dur time.Duration 675 if hasPushback { 676 dur = time.Millisecond * time.Duration(pushback) 677 cs.numRetriesSincePushback = 0 678 } else { 679 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback)) 680 cur := float64(rp.InitialBackoff) * fact 681 if max := float64(rp.MaxBackoff); cur > max { 682 cur = max 683 } 684 dur = time.Duration(grpcrand.Int63n(int64(cur))) 685 cs.numRetriesSincePushback++ 686 } 687 688 // TODO(dfawley): we could eagerly fail here if dur puts us past the 689 // deadline, but unsure if it is worth doing. 690 t := time.NewTimer(dur) 691 select { 692 case <-t.C: 693 cs.numRetries++ 694 return false, nil 695 case <-cs.ctx.Done(): 696 t.Stop() 697 return false, status.FromContextError(cs.ctx.Err()).Err() 698 } 699 } 700 701 // Returns nil if a retry was performed and succeeded; error otherwise. 702 func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error { 703 for { 704 attempt.finish(toRPCErr(lastErr)) 705 isTransparent, err := attempt.shouldRetry(lastErr) 706 if err != nil { 707 cs.commitAttemptLocked() 708 return err 709 } 710 cs.firstAttempt = false 711 attempt, err = cs.newAttemptLocked(isTransparent) 712 if err != nil { 713 // Only returns error if the clientconn is closed or the context of 714 // the stream is canceled. 715 return err 716 } 717 // Note that the first op in the replay buffer always sets cs.attempt 718 // if it is able to pick a transport and create a stream. 719 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil { 720 return nil 721 } 722 } 723 } 724 725 func (cs *clientStream) Context() context.Context { 726 cs.commitAttempt() 727 // No need to lock before using attempt, since we know it is committed and 728 // cannot change. 729 if cs.attempt.s != nil { 730 return cs.attempt.s.Context() 731 } 732 return cs.ctx 733 } 734 735 func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { 736 cs.mu.Lock() 737 for { 738 if cs.committed { 739 cs.mu.Unlock() 740 // toRPCErr is used in case the error from the attempt comes from 741 // NewClientStream, which intentionally doesn't return a status 742 // error to allow for further inspection; all other errors should 743 // already be status errors. 744 return toRPCErr(op(cs.attempt)) 745 } 746 if len(cs.buffer) == 0 { 747 // For the first op, which controls creation of the stream and 748 // assigns cs.attempt, we need to create a new attempt inline 749 // before executing the first op. On subsequent ops, the attempt 750 // is created immediately before replaying the ops. 751 var err error 752 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil { 753 cs.mu.Unlock() 754 cs.finish(err) 755 return err 756 } 757 } 758 a := cs.attempt 759 cs.mu.Unlock() 760 err := op(a) 761 cs.mu.Lock() 762 if a != cs.attempt { 763 // We started another attempt already. 764 continue 765 } 766 if err == io.EOF { 767 <-a.s.Done() 768 } 769 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { 770 onSuccess() 771 cs.mu.Unlock() 772 return err 773 } 774 if err := cs.retryLocked(a, err); err != nil { 775 cs.mu.Unlock() 776 return err 777 } 778 } 779 } 780 781 func (cs *clientStream) Header() (metadata.MD, error) { 782 var m metadata.MD 783 noHeader := false 784 err := cs.withRetry(func(a *csAttempt) error { 785 var err error 786 m, err = a.s.Header() 787 if err == transport.ErrNoHeaders { 788 noHeader = true 789 return nil 790 } 791 return toRPCErr(err) 792 }, cs.commitAttemptLocked) 793 794 if err != nil { 795 cs.finish(err) 796 return nil, err 797 } 798 799 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader { 800 // Only log if binary log is on and header has not been logged, and 801 // there is actually headers to log. 802 logEntry := &binarylog.ServerHeader{ 803 OnClientSide: true, 804 Header: m, 805 PeerAddr: nil, 806 } 807 if peer, ok := peer.FromContext(cs.Context()); ok { 808 logEntry.PeerAddr = peer.Addr 809 } 810 cs.serverHeaderBinlogged = true 811 for _, binlog := range cs.binlogs { 812 binlog.Log(cs.ctx, logEntry) 813 } 814 } 815 return m, nil 816 } 817 818 func (cs *clientStream) Trailer() metadata.MD { 819 // On RPC failure, we never need to retry, because usage requires that 820 // RecvMsg() returned a non-nil error before calling this function is valid. 821 // We would have retried earlier if necessary. 822 // 823 // Commit the attempt anyway, just in case users are not following those 824 // directions -- it will prevent races and should not meaningfully impact 825 // performance. 826 cs.commitAttempt() 827 if cs.attempt.s == nil { 828 return nil 829 } 830 return cs.attempt.s.Trailer() 831 } 832 833 func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error { 834 for _, f := range cs.buffer { 835 if err := f(attempt); err != nil { 836 return err 837 } 838 } 839 return nil 840 } 841 842 func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { 843 // Note: we still will buffer if retry is disabled (for transparent retries). 844 if cs.committed { 845 return 846 } 847 cs.bufferSize += sz 848 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { 849 cs.commitAttemptLocked() 850 return 851 } 852 cs.buffer = append(cs.buffer, op) 853 } 854 855 func (cs *clientStream) SendMsg(m interface{}) (err error) { 856 defer func() { 857 if err != nil && err != io.EOF { 858 // Call finish on the client stream for errors generated by this SendMsg 859 // call, as these indicate problems created by this client. (Transport 860 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 861 // error will be returned from RecvMsg eventually in that case, or be 862 // retried.) 863 cs.finish(err) 864 } 865 }() 866 if cs.sentLast { 867 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 868 } 869 if !cs.desc.ClientStreams { 870 cs.sentLast = true 871 } 872 873 // load hdr, payload, data 874 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) 875 if err != nil { 876 return err 877 } 878 879 // TODO(dfawley): should we be checking len(data) instead? 880 if len(payload) > *cs.callInfo.maxSendMessageSize { 881 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) 882 } 883 op := func(a *csAttempt) error { 884 return a.sendMsg(m, hdr, payload, data) 885 } 886 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) 887 if len(cs.binlogs) != 0 && err == nil { 888 cm := &binarylog.ClientMessage{ 889 OnClientSide: true, 890 Message: data, 891 } 892 for _, binlog := range cs.binlogs { 893 binlog.Log(cs.ctx, cm) 894 } 895 } 896 return err 897 } 898 899 func (cs *clientStream) RecvMsg(m interface{}) error { 900 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged { 901 // Call Header() to binary log header if it's not already logged. 902 cs.Header() 903 } 904 var recvInfo *payloadInfo 905 if len(cs.binlogs) != 0 { 906 recvInfo = &payloadInfo{} 907 } 908 err := cs.withRetry(func(a *csAttempt) error { 909 return a.recvMsg(m, recvInfo) 910 }, cs.commitAttemptLocked) 911 if len(cs.binlogs) != 0 && err == nil { 912 sm := &binarylog.ServerMessage{ 913 OnClientSide: true, 914 Message: recvInfo.uncompressedBytes, 915 } 916 for _, binlog := range cs.binlogs { 917 binlog.Log(cs.ctx, sm) 918 } 919 } 920 if err != nil || !cs.desc.ServerStreams { 921 // err != nil or non-server-streaming indicates end of stream. 922 cs.finish(err) 923 924 if len(cs.binlogs) != 0 { 925 // finish will not log Trailer. Log Trailer here. 926 logEntry := &binarylog.ServerTrailer{ 927 OnClientSide: true, 928 Trailer: cs.Trailer(), 929 Err: err, 930 } 931 if logEntry.Err == io.EOF { 932 logEntry.Err = nil 933 } 934 if peer, ok := peer.FromContext(cs.Context()); ok { 935 logEntry.PeerAddr = peer.Addr 936 } 937 for _, binlog := range cs.binlogs { 938 binlog.Log(cs.ctx, logEntry) 939 } 940 } 941 } 942 return err 943 } 944 945 func (cs *clientStream) CloseSend() error { 946 if cs.sentLast { 947 // TODO: return an error and finish the stream instead, due to API misuse? 948 return nil 949 } 950 cs.sentLast = true 951 op := func(a *csAttempt) error { 952 a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) 953 // Always return nil; io.EOF is the only error that might make sense 954 // instead, but there is no need to signal the client to call RecvMsg 955 // as the only use left for the stream after CloseSend is to call 956 // RecvMsg. This also matches historical behavior. 957 return nil 958 } 959 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) 960 if len(cs.binlogs) != 0 { 961 chc := &binarylog.ClientHalfClose{ 962 OnClientSide: true, 963 } 964 for _, binlog := range cs.binlogs { 965 binlog.Log(cs.ctx, chc) 966 } 967 } 968 // We never returned an error here for reasons. 969 return nil 970 } 971 972 func (cs *clientStream) finish(err error) { 973 if err == io.EOF { 974 // Ending a stream with EOF indicates a success. 975 err = nil 976 } 977 cs.mu.Lock() 978 if cs.finished { 979 cs.mu.Unlock() 980 return 981 } 982 cs.finished = true 983 for _, onFinish := range cs.callInfo.onFinish { 984 onFinish(err) 985 } 986 cs.commitAttemptLocked() 987 if cs.attempt != nil { 988 cs.attempt.finish(err) 989 // after functions all rely upon having a stream. 990 if cs.attempt.s != nil { 991 for _, o := range cs.opts { 992 o.after(cs.callInfo, cs.attempt) 993 } 994 } 995 } 996 cs.mu.Unlock() 997 // For binary logging. only log cancel in finish (could be caused by RPC ctx 998 // canceled or ClientConn closed). Trailer will be logged in RecvMsg. 999 // 1000 // Only one of cancel or trailer needs to be logged. In the cases where 1001 // users don't call RecvMsg, users must have already canceled the RPC. 1002 if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled { 1003 c := &binarylog.Cancel{ 1004 OnClientSide: true, 1005 } 1006 for _, binlog := range cs.binlogs { 1007 binlog.Log(cs.ctx, c) 1008 } 1009 } 1010 if err == nil { 1011 cs.retryThrottler.successfulRPC() 1012 } 1013 if channelz.IsOn() { 1014 if err != nil { 1015 cs.cc.incrCallsFailed() 1016 } else { 1017 cs.cc.incrCallsSucceeded() 1018 } 1019 } 1020 cs.cancel() 1021 } 1022 1023 func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { 1024 cs := a.cs 1025 if a.trInfo != nil { 1026 a.mu.Lock() 1027 if a.trInfo.tr != nil { 1028 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 1029 } 1030 a.mu.Unlock() 1031 } 1032 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { 1033 if !cs.desc.ClientStreams { 1034 // For non-client-streaming RPCs, we return nil instead of EOF on error 1035 // because the generated code requires it. finish is not called; RecvMsg() 1036 // will call it with the stream's status independently. 1037 return nil 1038 } 1039 return io.EOF 1040 } 1041 for _, sh := range a.statsHandlers { 1042 sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now())) 1043 } 1044 if channelz.IsOn() { 1045 a.t.IncrMsgSent() 1046 } 1047 return nil 1048 } 1049 1050 func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { 1051 cs := a.cs 1052 if len(a.statsHandlers) != 0 && payInfo == nil { 1053 payInfo = &payloadInfo{} 1054 } 1055 1056 if !a.decompSet { 1057 // Block until we receive headers containing received message encoding. 1058 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { 1059 if a.dc == nil || a.dc.Type() != ct { 1060 // No configured decompressor, or it does not match the incoming 1061 // message encoding; attempt to find a registered compressor that does. 1062 a.dc = nil 1063 a.decomp = encoding.GetCompressor(ct) 1064 } 1065 } else { 1066 // No compression is used; disable our decompressor. 1067 a.dc = nil 1068 } 1069 // Only initialize this state once per stream. 1070 a.decompSet = true 1071 } 1072 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) 1073 if err != nil { 1074 if err == io.EOF { 1075 if statusErr := a.s.Status().Err(); statusErr != nil { 1076 return statusErr 1077 } 1078 return io.EOF // indicates successful end of stream. 1079 } 1080 1081 return toRPCErr(err) 1082 } 1083 if a.trInfo != nil { 1084 a.mu.Lock() 1085 if a.trInfo.tr != nil { 1086 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 1087 } 1088 a.mu.Unlock() 1089 } 1090 for _, sh := range a.statsHandlers { 1091 sh.HandleRPC(a.ctx, &stats.InPayload{ 1092 Client: true, 1093 RecvTime: time.Now(), 1094 Payload: m, 1095 // TODO truncate large payload. 1096 Data: payInfo.uncompressedBytes, 1097 WireLength: payInfo.compressedLength + headerLen, 1098 CompressedLength: payInfo.compressedLength, 1099 Length: len(payInfo.uncompressedBytes), 1100 }) 1101 } 1102 if channelz.IsOn() { 1103 a.t.IncrMsgRecv() 1104 } 1105 if cs.desc.ServerStreams { 1106 // Subsequent messages should be received by subsequent RecvMsg calls. 1107 return nil 1108 } 1109 // Special handling for non-server-stream rpcs. 1110 // This recv expects EOF or errors, so we don't collect inPayload. 1111 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) 1112 if err == nil { 1113 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 1114 } 1115 if err == io.EOF { 1116 return a.s.Status().Err() // non-server streaming Recv returns nil on success 1117 } 1118 return toRPCErr(err) 1119 } 1120 1121 func (a *csAttempt) finish(err error) { 1122 a.mu.Lock() 1123 if a.finished { 1124 a.mu.Unlock() 1125 return 1126 } 1127 a.finished = true 1128 if err == io.EOF { 1129 // Ending a stream with EOF indicates a success. 1130 err = nil 1131 } 1132 var tr metadata.MD 1133 if a.s != nil { 1134 a.t.CloseStream(a.s, err) 1135 tr = a.s.Trailer() 1136 } 1137 1138 if a.pickResult.Done != nil { 1139 br := false 1140 if a.s != nil { 1141 br = a.s.BytesReceived() 1142 } 1143 a.pickResult.Done(balancer.DoneInfo{ 1144 Err: err, 1145 Trailer: tr, 1146 BytesSent: a.s != nil, 1147 BytesReceived: br, 1148 ServerLoad: balancerload.Parse(tr), 1149 }) 1150 } 1151 for _, sh := range a.statsHandlers { 1152 end := &stats.End{ 1153 Client: true, 1154 BeginTime: a.beginTime, 1155 EndTime: time.Now(), 1156 Trailer: tr, 1157 Error: err, 1158 } 1159 sh.HandleRPC(a.ctx, end) 1160 } 1161 if a.trInfo != nil && a.trInfo.tr != nil { 1162 if err == nil { 1163 a.trInfo.tr.LazyPrintf("RPC: [OK]") 1164 } else { 1165 a.trInfo.tr.LazyPrintf("RPC: [%v]", err) 1166 a.trInfo.tr.SetError() 1167 } 1168 a.trInfo.tr.Finish() 1169 a.trInfo.tr = nil 1170 } 1171 a.mu.Unlock() 1172 } 1173 1174 // newClientStream creates a ClientStream with the specified transport, on the 1175 // given addrConn. 1176 // 1177 // It's expected that the given transport is either the same one in addrConn, or 1178 // is already closed. To avoid race, transport is specified separately, instead 1179 // of using ac.transpot. 1180 // 1181 // Main difference between this and ClientConn.NewStream: 1182 // - no retry 1183 // - no service config (or wait for service config) 1184 // - no tracing or stats 1185 func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) { 1186 if t == nil { 1187 // TODO: return RPC error here? 1188 return nil, errors.New("transport provided is nil") 1189 } 1190 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. 1191 c := &callInfo{} 1192 1193 // Possible context leak: 1194 // The cancel function for the child context we create will only be called 1195 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 1196 // an error is generated by SendMsg. 1197 // https://github.com/grpc/grpc-go/issues/1818. 1198 ctx, cancel := context.WithCancel(ctx) 1199 defer func() { 1200 if err != nil { 1201 cancel() 1202 } 1203 }() 1204 1205 for _, o := range opts { 1206 if err := o.before(c); err != nil { 1207 return nil, toRPCErr(err) 1208 } 1209 } 1210 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 1211 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) 1212 if err := setCallInfoCodec(c); err != nil { 1213 return nil, err 1214 } 1215 1216 callHdr := &transport.CallHdr{ 1217 Host: ac.cc.authority, 1218 Method: method, 1219 ContentSubtype: c.contentSubtype, 1220 } 1221 1222 // Set our outgoing compression according to the UseCompressor CallOption, if 1223 // set. In that case, also find the compressor from the encoding package. 1224 // Otherwise, use the compressor configured by the WithCompressor DialOption, 1225 // if set. 1226 var cp Compressor 1227 var comp encoding.Compressor 1228 if ct := c.compressorType; ct != "" { 1229 callHdr.SendCompress = ct 1230 if ct != encoding.Identity { 1231 comp = encoding.GetCompressor(ct) 1232 if comp == nil { 1233 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 1234 } 1235 } 1236 } else if ac.cc.dopts.cp != nil { 1237 callHdr.SendCompress = ac.cc.dopts.cp.Type() 1238 cp = ac.cc.dopts.cp 1239 } 1240 if c.creds != nil { 1241 callHdr.Creds = c.creds 1242 } 1243 1244 // Use a special addrConnStream to avoid retry. 1245 as := &addrConnStream{ 1246 callHdr: callHdr, 1247 ac: ac, 1248 ctx: ctx, 1249 cancel: cancel, 1250 opts: opts, 1251 callInfo: c, 1252 desc: desc, 1253 codec: c.codec, 1254 cp: cp, 1255 comp: comp, 1256 t: t, 1257 } 1258 1259 s, err := as.t.NewStream(as.ctx, as.callHdr) 1260 if err != nil { 1261 err = toRPCErr(err) 1262 return nil, err 1263 } 1264 as.s = s 1265 as.p = &parser{r: s} 1266 ac.incrCallsStarted() 1267 if desc != unaryStreamDesc { 1268 // Listen on cc and stream contexts to cleanup when the user closes the 1269 // ClientConn or cancels the stream context. In all other cases, an error 1270 // should already be injected into the recv buffer by the transport, which 1271 // the client will eventually receive, and then we will cancel the stream's 1272 // context in clientStream.finish. 1273 go func() { 1274 select { 1275 case <-ac.ctx.Done(): 1276 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) 1277 case <-ctx.Done(): 1278 as.finish(toRPCErr(ctx.Err())) 1279 } 1280 }() 1281 } 1282 return as, nil 1283 } 1284 1285 type addrConnStream struct { 1286 s *transport.Stream 1287 ac *addrConn 1288 callHdr *transport.CallHdr 1289 cancel context.CancelFunc 1290 opts []CallOption 1291 callInfo *callInfo 1292 t transport.ClientTransport 1293 ctx context.Context 1294 sentLast bool 1295 desc *StreamDesc 1296 codec baseCodec 1297 cp Compressor 1298 comp encoding.Compressor 1299 decompSet bool 1300 dc Decompressor 1301 decomp encoding.Compressor 1302 p *parser 1303 mu sync.Mutex 1304 finished bool 1305 } 1306 1307 func (as *addrConnStream) Header() (metadata.MD, error) { 1308 m, err := as.s.Header() 1309 if err != nil { 1310 as.finish(toRPCErr(err)) 1311 } 1312 return m, err 1313 } 1314 1315 func (as *addrConnStream) Trailer() metadata.MD { 1316 return as.s.Trailer() 1317 } 1318 1319 func (as *addrConnStream) CloseSend() error { 1320 if as.sentLast { 1321 // TODO: return an error and finish the stream instead, due to API misuse? 1322 return nil 1323 } 1324 as.sentLast = true 1325 1326 as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) 1327 // Always return nil; io.EOF is the only error that might make sense 1328 // instead, but there is no need to signal the client to call RecvMsg 1329 // as the only use left for the stream after CloseSend is to call 1330 // RecvMsg. This also matches historical behavior. 1331 return nil 1332 } 1333 1334 func (as *addrConnStream) Context() context.Context { 1335 return as.s.Context() 1336 } 1337 1338 func (as *addrConnStream) SendMsg(m interface{}) (err error) { 1339 defer func() { 1340 if err != nil && err != io.EOF { 1341 // Call finish on the client stream for errors generated by this SendMsg 1342 // call, as these indicate problems created by this client. (Transport 1343 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 1344 // error will be returned from RecvMsg eventually in that case, or be 1345 // retried.) 1346 as.finish(err) 1347 } 1348 }() 1349 if as.sentLast { 1350 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 1351 } 1352 if !as.desc.ClientStreams { 1353 as.sentLast = true 1354 } 1355 1356 // load hdr, payload, data 1357 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) 1358 if err != nil { 1359 return err 1360 } 1361 1362 // TODO(dfawley): should we be checking len(data) instead? 1363 if len(payld) > *as.callInfo.maxSendMessageSize { 1364 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) 1365 } 1366 1367 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { 1368 if !as.desc.ClientStreams { 1369 // For non-client-streaming RPCs, we return nil instead of EOF on error 1370 // because the generated code requires it. finish is not called; RecvMsg() 1371 // will call it with the stream's status independently. 1372 return nil 1373 } 1374 return io.EOF 1375 } 1376 1377 if channelz.IsOn() { 1378 as.t.IncrMsgSent() 1379 } 1380 return nil 1381 } 1382 1383 func (as *addrConnStream) RecvMsg(m interface{}) (err error) { 1384 defer func() { 1385 if err != nil || !as.desc.ServerStreams { 1386 // err != nil or non-server-streaming indicates end of stream. 1387 as.finish(err) 1388 } 1389 }() 1390 1391 if !as.decompSet { 1392 // Block until we receive headers containing received message encoding. 1393 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { 1394 if as.dc == nil || as.dc.Type() != ct { 1395 // No configured decompressor, or it does not match the incoming 1396 // message encoding; attempt to find a registered compressor that does. 1397 as.dc = nil 1398 as.decomp = encoding.GetCompressor(ct) 1399 } 1400 } else { 1401 // No compression is used; disable our decompressor. 1402 as.dc = nil 1403 } 1404 // Only initialize this state once per stream. 1405 as.decompSet = true 1406 } 1407 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1408 if err != nil { 1409 if err == io.EOF { 1410 if statusErr := as.s.Status().Err(); statusErr != nil { 1411 return statusErr 1412 } 1413 return io.EOF // indicates successful end of stream. 1414 } 1415 return toRPCErr(err) 1416 } 1417 1418 if channelz.IsOn() { 1419 as.t.IncrMsgRecv() 1420 } 1421 if as.desc.ServerStreams { 1422 // Subsequent messages should be received by subsequent RecvMsg calls. 1423 return nil 1424 } 1425 1426 // Special handling for non-server-stream rpcs. 1427 // This recv expects EOF or errors, so we don't collect inPayload. 1428 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1429 if err == nil { 1430 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 1431 } 1432 if err == io.EOF { 1433 return as.s.Status().Err() // non-server streaming Recv returns nil on success 1434 } 1435 return toRPCErr(err) 1436 } 1437 1438 func (as *addrConnStream) finish(err error) { 1439 as.mu.Lock() 1440 if as.finished { 1441 as.mu.Unlock() 1442 return 1443 } 1444 as.finished = true 1445 if err == io.EOF { 1446 // Ending a stream with EOF indicates a success. 1447 err = nil 1448 } 1449 if as.s != nil { 1450 as.t.CloseStream(as.s, err) 1451 } 1452 1453 if err != nil { 1454 as.ac.incrCallsFailed() 1455 } else { 1456 as.ac.incrCallsSucceeded() 1457 } 1458 as.cancel() 1459 as.mu.Unlock() 1460 } 1461 1462 // ServerStream defines the server-side behavior of a streaming RPC. 1463 // 1464 // Errors returned from ServerStream methods are compatible with the status 1465 // package. However, the status code will often not match the RPC status as 1466 // seen by the client application, and therefore, should not be relied upon for 1467 // this purpose. 1468 type ServerStream interface { 1469 // SetHeader sets the header metadata. It may be called multiple times. 1470 // When call multiple times, all the provided metadata will be merged. 1471 // All the metadata will be sent out when one of the following happens: 1472 // - ServerStream.SendHeader() is called; 1473 // - The first response is sent out; 1474 // - An RPC status is sent out (error or success). 1475 SetHeader(metadata.MD) error 1476 // SendHeader sends the header metadata. 1477 // The provided md and headers set by SetHeader() will be sent. 1478 // It fails if called multiple times. 1479 SendHeader(metadata.MD) error 1480 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 1481 // When called more than once, all the provided metadata will be merged. 1482 SetTrailer(metadata.MD) 1483 // Context returns the context for this stream. 1484 Context() context.Context 1485 // SendMsg sends a message. On error, SendMsg aborts the stream and the 1486 // error is returned directly. 1487 // 1488 // SendMsg blocks until: 1489 // - There is sufficient flow control to schedule m with the transport, or 1490 // - The stream is done, or 1491 // - The stream breaks. 1492 // 1493 // SendMsg does not wait until the message is received by the client. An 1494 // untimely stream closure may result in lost messages. 1495 // 1496 // It is safe to have a goroutine calling SendMsg and another goroutine 1497 // calling RecvMsg on the same stream at the same time, but it is not safe 1498 // to call SendMsg on the same stream in different goroutines. 1499 // 1500 // It is not safe to modify the message after calling SendMsg. Tracing 1501 // libraries and stats handlers may use the message lazily. 1502 SendMsg(m interface{}) error 1503 // RecvMsg blocks until it receives a message into m or the stream is 1504 // done. It returns io.EOF when the client has performed a CloseSend. On 1505 // any non-EOF error, the stream is aborted and the error contains the 1506 // RPC status. 1507 // 1508 // It is safe to have a goroutine calling SendMsg and another goroutine 1509 // calling RecvMsg on the same stream at the same time, but it is not 1510 // safe to call RecvMsg on the same stream in different goroutines. 1511 RecvMsg(m interface{}) error 1512 } 1513 1514 // serverStream implements a server side Stream. 1515 type serverStream struct { 1516 ctx context.Context 1517 t transport.ServerTransport 1518 s *transport.Stream 1519 p *parser 1520 codec baseCodec 1521 1522 cp Compressor 1523 dc Decompressor 1524 comp encoding.Compressor 1525 decomp encoding.Compressor 1526 1527 sendCompressorName string 1528 1529 maxReceiveMessageSize int 1530 maxSendMessageSize int 1531 trInfo *traceInfo 1532 1533 statsHandler []stats.Handler 1534 1535 binlogs []binarylog.MethodLogger 1536 // serverHeaderBinlogged indicates whether server header has been logged. It 1537 // will happen when one of the following two happens: stream.SendHeader(), 1538 // stream.Send(). 1539 // 1540 // It's only checked in send and sendHeader, doesn't need to be 1541 // synchronized. 1542 serverHeaderBinlogged bool 1543 1544 mu sync.Mutex // protects trInfo.tr after the service handler runs. 1545 } 1546 1547 func (ss *serverStream) Context() context.Context { 1548 return ss.ctx 1549 } 1550 1551 func (ss *serverStream) SetHeader(md metadata.MD) error { 1552 if md.Len() == 0 { 1553 return nil 1554 } 1555 err := imetadata.Validate(md) 1556 if err != nil { 1557 return status.Error(codes.Internal, err.Error()) 1558 } 1559 return ss.s.SetHeader(md) 1560 } 1561 1562 func (ss *serverStream) SendHeader(md metadata.MD) error { 1563 err := imetadata.Validate(md) 1564 if err != nil { 1565 return status.Error(codes.Internal, err.Error()) 1566 } 1567 1568 err = ss.t.WriteHeader(ss.s, md) 1569 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged { 1570 h, _ := ss.s.Header() 1571 sh := &binarylog.ServerHeader{ 1572 Header: h, 1573 } 1574 ss.serverHeaderBinlogged = true 1575 for _, binlog := range ss.binlogs { 1576 binlog.Log(ss.ctx, sh) 1577 } 1578 } 1579 return err 1580 } 1581 1582 func (ss *serverStream) SetTrailer(md metadata.MD) { 1583 if md.Len() == 0 { 1584 return 1585 } 1586 if err := imetadata.Validate(md); err != nil { 1587 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err) 1588 } 1589 ss.s.SetTrailer(md) 1590 } 1591 1592 func (ss *serverStream) SendMsg(m interface{}) (err error) { 1593 defer func() { 1594 if ss.trInfo != nil { 1595 ss.mu.Lock() 1596 if ss.trInfo.tr != nil { 1597 if err == nil { 1598 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 1599 } else { 1600 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1601 ss.trInfo.tr.SetError() 1602 } 1603 } 1604 ss.mu.Unlock() 1605 } 1606 if err != nil && err != io.EOF { 1607 st, _ := status.FromError(toRPCErr(err)) 1608 ss.t.WriteStatus(ss.s, st) 1609 // Non-user specified status was sent out. This should be an error 1610 // case (as a server side Cancel maybe). 1611 // 1612 // This is not handled specifically now. User will return a final 1613 // status from the service handler, we will log that error instead. 1614 // This behavior is similar to an interceptor. 1615 } 1616 if channelz.IsOn() && err == nil { 1617 ss.t.IncrMsgSent() 1618 } 1619 }() 1620 1621 // Server handler could have set new compressor by calling SetSendCompressor. 1622 // In case it is set, we need to use it for compressing outbound message. 1623 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName { 1624 ss.comp = encoding.GetCompressor(sendCompressorsName) 1625 ss.sendCompressorName = sendCompressorsName 1626 } 1627 1628 // load hdr, payload, data 1629 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) 1630 if err != nil { 1631 return err 1632 } 1633 1634 // TODO(dfawley): should we be checking len(data) instead? 1635 if len(payload) > ss.maxSendMessageSize { 1636 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) 1637 } 1638 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { 1639 return toRPCErr(err) 1640 } 1641 if len(ss.binlogs) != 0 { 1642 if !ss.serverHeaderBinlogged { 1643 h, _ := ss.s.Header() 1644 sh := &binarylog.ServerHeader{ 1645 Header: h, 1646 } 1647 ss.serverHeaderBinlogged = true 1648 for _, binlog := range ss.binlogs { 1649 binlog.Log(ss.ctx, sh) 1650 } 1651 } 1652 sm := &binarylog.ServerMessage{ 1653 Message: data, 1654 } 1655 for _, binlog := range ss.binlogs { 1656 binlog.Log(ss.ctx, sm) 1657 } 1658 } 1659 if len(ss.statsHandler) != 0 { 1660 for _, sh := range ss.statsHandler { 1661 sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) 1662 } 1663 } 1664 return nil 1665 } 1666 1667 func (ss *serverStream) RecvMsg(m interface{}) (err error) { 1668 defer func() { 1669 if ss.trInfo != nil { 1670 ss.mu.Lock() 1671 if ss.trInfo.tr != nil { 1672 if err == nil { 1673 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 1674 } else if err != io.EOF { 1675 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1676 ss.trInfo.tr.SetError() 1677 } 1678 } 1679 ss.mu.Unlock() 1680 } 1681 if err != nil && err != io.EOF { 1682 st, _ := status.FromError(toRPCErr(err)) 1683 ss.t.WriteStatus(ss.s, st) 1684 // Non-user specified status was sent out. This should be an error 1685 // case (as a server side Cancel maybe). 1686 // 1687 // This is not handled specifically now. User will return a final 1688 // status from the service handler, we will log that error instead. 1689 // This behavior is similar to an interceptor. 1690 } 1691 if channelz.IsOn() && err == nil { 1692 ss.t.IncrMsgRecv() 1693 } 1694 }() 1695 var payInfo *payloadInfo 1696 if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 { 1697 payInfo = &payloadInfo{} 1698 } 1699 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { 1700 if err == io.EOF { 1701 if len(ss.binlogs) != 0 { 1702 chc := &binarylog.ClientHalfClose{} 1703 for _, binlog := range ss.binlogs { 1704 binlog.Log(ss.ctx, chc) 1705 } 1706 } 1707 return err 1708 } 1709 if err == io.ErrUnexpectedEOF { 1710 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 1711 } 1712 return toRPCErr(err) 1713 } 1714 if len(ss.statsHandler) != 0 { 1715 for _, sh := range ss.statsHandler { 1716 sh.HandleRPC(ss.s.Context(), &stats.InPayload{ 1717 RecvTime: time.Now(), 1718 Payload: m, 1719 // TODO truncate large payload. 1720 Data: payInfo.uncompressedBytes, 1721 Length: len(payInfo.uncompressedBytes), 1722 WireLength: payInfo.compressedLength + headerLen, 1723 CompressedLength: payInfo.compressedLength, 1724 }) 1725 } 1726 } 1727 if len(ss.binlogs) != 0 { 1728 cm := &binarylog.ClientMessage{ 1729 Message: payInfo.uncompressedBytes, 1730 } 1731 for _, binlog := range ss.binlogs { 1732 binlog.Log(ss.ctx, cm) 1733 } 1734 } 1735 return nil 1736 } 1737 1738 // MethodFromServerStream returns the method string for the input stream. 1739 // The returned string is in the format of "/service/method". 1740 func MethodFromServerStream(stream ServerStream) (string, bool) { 1741 return Method(stream.Context()) 1742 } 1743 1744 // prepareMsg returns the hdr, payload and data 1745 // using the compressors passed or using the 1746 // passed preparedmsg 1747 func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { 1748 if preparedMsg, ok := m.(*PreparedMsg); ok { 1749 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil 1750 } 1751 // The input interface is not a prepared msg. 1752 // Marshal and Compress the data at this point 1753 data, err = encode(codec, m) 1754 if err != nil { 1755 return nil, nil, nil, err 1756 } 1757 compData, err := compress(data, cp, comp) 1758 if err != nil { 1759 return nil, nil, nil, err 1760 } 1761 hdr, payload = msgHeader(data, compData) 1762 return hdr, payload, data, nil 1763 }