clientconn.go (54183B)
1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net/url" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/backoff" 39 "google.golang.org/grpc/internal/channelz" 40 "google.golang.org/grpc/internal/grpcsync" 41 iresolver "google.golang.org/grpc/internal/resolver" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. 52 ) 53 54 const ( 55 // minimum time to give a connection to complete 56 minConnectTimeout = 20 * time.Second 57 // must match grpclbName in grpclb/grpclb.go 58 grpclbName = "grpclb" 59 ) 60 61 var ( 62 // ErrClientConnClosing indicates that the operation is illegal because 63 // the ClientConn is closing. 64 // 65 // Deprecated: this error should not be relied upon by users; use the status 66 // code of Canceled instead. 67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 69 errConnDrain = errors.New("grpc: the connection is drained") 70 // errConnClosing indicates that the connection is closing. 71 errConnClosing = errors.New("grpc: the connection is closing") 72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 73 // service config. 74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 75 ) 76 77 // The following errors are returned from Dial and DialContext 78 var ( 79 // errNoTransportSecurity indicates that there is no transport security 80 // being set for ClientConn. Users should either set one or explicitly 81 // call WithInsecure DialOption to disable security. 82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)") 83 // errTransportCredsAndBundle indicates that creds bundle is used together 84 // with other individual Transport Credentials. 85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 86 // errNoTransportCredsInBundle indicated that the configured creds bundle 87 // returned a transport credentials which was nil. 88 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials") 89 // errTransportCredentialsMissing indicates that users want to transmit 90 // security information (e.g., OAuth2 token) which requires secure 91 // connection on an insecure connection. 92 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 93 ) 94 95 const ( 96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 97 defaultClientMaxSendMessageSize = math.MaxInt32 98 // http2IOBufSize specifies the buffer size for sending frames. 99 defaultWriteBufSize = 32 * 1024 100 defaultReadBufSize = 32 * 1024 101 ) 102 103 // Dial creates a client connection to the given target. 104 func Dial(target string, opts ...DialOption) (*ClientConn, error) { 105 return DialContext(context.Background(), target, opts...) 106 } 107 108 type defaultConfigSelector struct { 109 sc *ServiceConfig 110 } 111 112 func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { 113 return &iresolver.RPCConfig{ 114 Context: rpcInfo.Context, 115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method), 116 }, nil 117 } 118 119 // DialContext creates a client connection to the given target. By default, it's 120 // a non-blocking dial (the function won't wait for connections to be 121 // established, and connecting happens in the background). To make it a blocking 122 // dial, use WithBlock() dial option. 123 // 124 // In the non-blocking case, the ctx does not act against the connection. It 125 // only controls the setup steps. 126 // 127 // In the blocking case, ctx can be used to cancel or expire the pending 128 // connection. Once this function returns, the cancellation and expiration of 129 // ctx will be noop. Users should call ClientConn.Close to terminate all the 130 // pending operations after this function returns. 131 // 132 // The target name syntax is defined in 133 // https://github.com/grpc/grpc/blob/master/doc/naming.md. 134 // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 135 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 136 cc := &ClientConn{ 137 target: target, 138 csMgr: &connectivityStateManager{}, 139 conns: make(map[*addrConn]struct{}), 140 dopts: defaultDialOptions(), 141 blockingpicker: newPickerWrapper(), 142 czData: new(channelzData), 143 firstResolveEvent: grpcsync.NewEvent(), 144 } 145 cc.retryThrottler.Store((*retryThrottler)(nil)) 146 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) 147 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 148 149 disableGlobalOpts := false 150 for _, opt := range opts { 151 if _, ok := opt.(*disableGlobalDialOptions); ok { 152 disableGlobalOpts = true 153 break 154 } 155 } 156 157 if !disableGlobalOpts { 158 for _, opt := range globalDialOptions { 159 opt.apply(&cc.dopts) 160 } 161 } 162 163 for _, opt := range opts { 164 opt.apply(&cc.dopts) 165 } 166 167 chainUnaryClientInterceptors(cc) 168 chainStreamClientInterceptors(cc) 169 170 defer func() { 171 if err != nil { 172 cc.Close() 173 } 174 }() 175 176 pid := cc.dopts.channelzParentID 177 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target) 178 ted := &channelz.TraceEventDesc{ 179 Desc: "Channel created", 180 Severity: channelz.CtInfo, 181 } 182 if cc.dopts.channelzParentID != nil { 183 ted.Parent = &channelz.TraceEventDesc{ 184 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), 185 Severity: channelz.CtInfo, 186 } 187 } 188 channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) 189 cc.csMgr.channelzID = cc.channelzID 190 191 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 192 return nil, errNoTransportSecurity 193 } 194 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 195 return nil, errTransportCredsAndBundle 196 } 197 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil { 198 return nil, errNoTransportCredsInBundle 199 } 200 transportCreds := cc.dopts.copts.TransportCredentials 201 if transportCreds == nil { 202 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials() 203 } 204 if transportCreds.Info().SecurityProtocol == "insecure" { 205 for _, cd := range cc.dopts.copts.PerRPCCredentials { 206 if cd.RequireTransportSecurity() { 207 return nil, errTransportCredentialsMissing 208 } 209 } 210 } 211 212 if cc.dopts.defaultServiceConfigRawJSON != nil { 213 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 214 if scpr.Err != nil { 215 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 216 } 217 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 218 } 219 cc.mkp = cc.dopts.copts.KeepaliveParams 220 221 if cc.dopts.copts.UserAgent != "" { 222 cc.dopts.copts.UserAgent += " " + grpcUA 223 } else { 224 cc.dopts.copts.UserAgent = grpcUA 225 } 226 227 if cc.dopts.timeout > 0 { 228 var cancel context.CancelFunc 229 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 230 defer cancel() 231 } 232 defer func() { 233 select { 234 case <-ctx.Done(): 235 switch { 236 case ctx.Err() == err: 237 conn = nil 238 case err == nil || !cc.dopts.returnLastError: 239 conn, err = nil, ctx.Err() 240 default: 241 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err) 242 } 243 default: 244 } 245 }() 246 247 if cc.dopts.bs == nil { 248 cc.dopts.bs = backoff.DefaultExponential 249 } 250 251 // Determine the resolver to use. 252 resolverBuilder, err := cc.parseTargetAndFindResolver() 253 if err != nil { 254 return nil, err 255 } 256 cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint(), cc.target, cc.dopts) 257 if err != nil { 258 return nil, err 259 } 260 channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) 261 262 if cc.dopts.scChan != nil { 263 // Blocking wait for the initial service config. 264 select { 265 case sc, ok := <-cc.dopts.scChan: 266 if ok { 267 cc.sc = &sc 268 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 269 } 270 case <-ctx.Done(): 271 return nil, ctx.Err() 272 } 273 } 274 if cc.dopts.scChan != nil { 275 go cc.scWatcher() 276 } 277 278 var credsClone credentials.TransportCredentials 279 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 280 credsClone = creds.Clone() 281 } 282 cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ 283 DialCreds: credsClone, 284 CredsBundle: cc.dopts.copts.CredsBundle, 285 Dialer: cc.dopts.copts.Dialer, 286 Authority: cc.authority, 287 CustomUserAgent: cc.dopts.copts.UserAgent, 288 ChannelzParentID: cc.channelzID, 289 Target: cc.parsedTarget, 290 }) 291 292 // Build the resolver. 293 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 294 if err != nil { 295 return nil, fmt.Errorf("failed to build resolver: %v", err) 296 } 297 cc.mu.Lock() 298 cc.resolverWrapper = rWrapper 299 cc.mu.Unlock() 300 301 // A blocking dial blocks until the clientConn is ready. 302 if cc.dopts.block { 303 for { 304 cc.Connect() 305 s := cc.GetState() 306 if s == connectivity.Ready { 307 break 308 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 309 if err = cc.connectionError(); err != nil { 310 terr, ok := err.(interface { 311 Temporary() bool 312 }) 313 if ok && !terr.Temporary() { 314 return nil, err 315 } 316 } 317 } 318 if !cc.WaitForStateChange(ctx, s) { 319 // ctx got timeout or canceled. 320 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { 321 return nil, err 322 } 323 return nil, ctx.Err() 324 } 325 } 326 } 327 328 return cc, nil 329 } 330 331 // chainUnaryClientInterceptors chains all unary client interceptors into one. 332 func chainUnaryClientInterceptors(cc *ClientConn) { 333 interceptors := cc.dopts.chainUnaryInts 334 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 335 // be executed before any other chained interceptors. 336 if cc.dopts.unaryInt != nil { 337 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 338 } 339 var chainedInt UnaryClientInterceptor 340 if len(interceptors) == 0 { 341 chainedInt = nil 342 } else if len(interceptors) == 1 { 343 chainedInt = interceptors[0] 344 } else { 345 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 346 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 347 } 348 } 349 cc.dopts.unaryInt = chainedInt 350 } 351 352 // getChainUnaryInvoker recursively generate the chained unary invoker. 353 func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 354 if curr == len(interceptors)-1 { 355 return finalInvoker 356 } 357 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 358 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 359 } 360 } 361 362 // chainStreamClientInterceptors chains all stream client interceptors into one. 363 func chainStreamClientInterceptors(cc *ClientConn) { 364 interceptors := cc.dopts.chainStreamInts 365 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 366 // be executed before any other chained interceptors. 367 if cc.dopts.streamInt != nil { 368 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 369 } 370 var chainedInt StreamClientInterceptor 371 if len(interceptors) == 0 { 372 chainedInt = nil 373 } else if len(interceptors) == 1 { 374 chainedInt = interceptors[0] 375 } else { 376 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 377 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 378 } 379 } 380 cc.dopts.streamInt = chainedInt 381 } 382 383 // getChainStreamer recursively generate the chained client stream constructor. 384 func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 385 if curr == len(interceptors)-1 { 386 return finalStreamer 387 } 388 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 389 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 390 } 391 } 392 393 // connectivityStateManager keeps the connectivity.State of ClientConn. 394 // This struct will eventually be exported so the balancers can access it. 395 type connectivityStateManager struct { 396 mu sync.Mutex 397 state connectivity.State 398 notifyChan chan struct{} 399 channelzID *channelz.Identifier 400 } 401 402 // updateState updates the connectivity.State of ClientConn. 403 // If there's a change it notifies goroutines waiting on state change to 404 // happen. 405 func (csm *connectivityStateManager) updateState(state connectivity.State) { 406 csm.mu.Lock() 407 defer csm.mu.Unlock() 408 if csm.state == connectivity.Shutdown { 409 return 410 } 411 if csm.state == state { 412 return 413 } 414 csm.state = state 415 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state) 416 if csm.notifyChan != nil { 417 // There are other goroutines waiting on this channel. 418 close(csm.notifyChan) 419 csm.notifyChan = nil 420 } 421 } 422 423 func (csm *connectivityStateManager) getState() connectivity.State { 424 csm.mu.Lock() 425 defer csm.mu.Unlock() 426 return csm.state 427 } 428 429 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 430 csm.mu.Lock() 431 defer csm.mu.Unlock() 432 if csm.notifyChan == nil { 433 csm.notifyChan = make(chan struct{}) 434 } 435 return csm.notifyChan 436 } 437 438 // ClientConnInterface defines the functions clients need to perform unary and 439 // streaming RPCs. It is implemented by *ClientConn, and is only intended to 440 // be referenced by generated code. 441 type ClientConnInterface interface { 442 // Invoke performs a unary RPC and returns after the response is received 443 // into reply. 444 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 445 // NewStream begins a streaming RPC. 446 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 447 } 448 449 // Assert *ClientConn implements ClientConnInterface. 450 var _ ClientConnInterface = (*ClientConn)(nil) 451 452 // ClientConn represents a virtual connection to a conceptual endpoint, to 453 // perform RPCs. 454 // 455 // A ClientConn is free to have zero or more actual connections to the endpoint 456 // based on configuration, load, etc. It is also free to determine which actual 457 // endpoints to use and may change it every RPC, permitting client-side load 458 // balancing. 459 // 460 // A ClientConn encapsulates a range of functionality including name 461 // resolution, TCP connection establishment (with retries and backoff) and TLS 462 // handshakes. It also handles errors on established connections by 463 // re-resolving the name and reconnecting. 464 type ClientConn struct { 465 ctx context.Context // Initialized using the background context at dial time. 466 cancel context.CancelFunc // Cancelled on close. 467 468 // The following are initialized at dial time, and are read-only after that. 469 target string // User's dial target. 470 parsedTarget resolver.Target // See parseTargetAndFindResolver(). 471 authority string // See determineAuthority(). 472 dopts dialOptions // Default and user specified dial options. 473 channelzID *channelz.Identifier // Channelz identifier for the channel. 474 balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. 475 476 // The following provide their own synchronization, and therefore don't 477 // require cc.mu to be held to access them. 478 csMgr *connectivityStateManager 479 blockingpicker *pickerWrapper 480 safeConfigSelector iresolver.SafeConfigSelector 481 czData *channelzData 482 retryThrottler atomic.Value // Updated from service config. 483 484 // firstResolveEvent is used to track whether the name resolver sent us at 485 // least one update. RPCs block on this event. 486 firstResolveEvent *grpcsync.Event 487 488 // mu protects the following fields. 489 // TODO: split mu so the same mutex isn't used for everything. 490 mu sync.RWMutex 491 resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close. 492 sc *ServiceConfig // Latest service config received from the resolver. 493 conns map[*addrConn]struct{} // Set to nil on close. 494 mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. 495 496 lceMu sync.Mutex // protects lastConnectionError 497 lastConnectionError error 498 } 499 500 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 501 // ctx expires. A true value is returned in former case and false in latter. 502 // 503 // # Experimental 504 // 505 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 506 // later release. 507 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 508 ch := cc.csMgr.getNotifyChan() 509 if cc.csMgr.getState() != sourceState { 510 return true 511 } 512 select { 513 case <-ctx.Done(): 514 return false 515 case <-ch: 516 return true 517 } 518 } 519 520 // GetState returns the connectivity.State of ClientConn. 521 // 522 // # Experimental 523 // 524 // Notice: This API is EXPERIMENTAL and may be changed or removed in a later 525 // release. 526 func (cc *ClientConn) GetState() connectivity.State { 527 return cc.csMgr.getState() 528 } 529 530 // Connect causes all subchannels in the ClientConn to attempt to connect if 531 // the channel is idle. Does not wait for the connection attempts to begin 532 // before returning. 533 // 534 // # Experimental 535 // 536 // Notice: This API is EXPERIMENTAL and may be changed or removed in a later 537 // release. 538 func (cc *ClientConn) Connect() { 539 cc.balancerWrapper.exitIdle() 540 } 541 542 func (cc *ClientConn) scWatcher() { 543 for { 544 select { 545 case sc, ok := <-cc.dopts.scChan: 546 if !ok { 547 return 548 } 549 cc.mu.Lock() 550 // TODO: load balance policy runtime change is ignored. 551 // We may revisit this decision in the future. 552 cc.sc = &sc 553 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 554 cc.mu.Unlock() 555 case <-cc.ctx.Done(): 556 return 557 } 558 } 559 } 560 561 // waitForResolvedAddrs blocks until the resolver has provided addresses or the 562 // context expires. Returns nil unless the context expires first; otherwise 563 // returns a status error based on the context. 564 func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 565 // This is on the RPC path, so we use a fast path to avoid the 566 // more-expensive "select" below after the resolver has returned once. 567 if cc.firstResolveEvent.HasFired() { 568 return nil 569 } 570 select { 571 case <-cc.firstResolveEvent.Done(): 572 return nil 573 case <-ctx.Done(): 574 return status.FromContextError(ctx.Err()).Err() 575 case <-cc.ctx.Done(): 576 return ErrClientConnClosing 577 } 578 } 579 580 var emptyServiceConfig *ServiceConfig 581 582 func init() { 583 cfg := parseServiceConfig("{}") 584 if cfg.Err != nil { 585 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 586 } 587 emptyServiceConfig = cfg.Config.(*ServiceConfig) 588 } 589 590 func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 591 if cc.sc != nil { 592 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) 593 return 594 } 595 if cc.dopts.defaultServiceConfig != nil { 596 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs) 597 } else { 598 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs) 599 } 600 } 601 602 func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 603 defer cc.firstResolveEvent.Fire() 604 cc.mu.Lock() 605 // Check if the ClientConn is already closed. Some fields (e.g. 606 // balancerWrapper) are set to nil when closing the ClientConn, and could 607 // cause nil pointer panic if we don't have this check. 608 if cc.conns == nil { 609 cc.mu.Unlock() 610 return nil 611 } 612 613 if err != nil { 614 // May need to apply the initial service config in case the resolver 615 // doesn't support service configs, or doesn't provide a service config 616 // with the new addresses. 617 cc.maybeApplyDefaultServiceConfig(nil) 618 619 cc.balancerWrapper.resolverError(err) 620 621 // No addresses are valid with err set; return early. 622 cc.mu.Unlock() 623 return balancer.ErrBadResolverState 624 } 625 626 var ret error 627 if cc.dopts.disableServiceConfig { 628 channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) 629 cc.maybeApplyDefaultServiceConfig(s.Addresses) 630 } else if s.ServiceConfig == nil { 631 cc.maybeApplyDefaultServiceConfig(s.Addresses) 632 // TODO: do we need to apply a failing LB policy if there is no 633 // default, per the error handling design? 634 } else { 635 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 636 configSelector := iresolver.GetConfigSelector(s) 637 if configSelector != nil { 638 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { 639 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") 640 } 641 } else { 642 configSelector = &defaultConfigSelector{sc} 643 } 644 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) 645 } else { 646 ret = balancer.ErrBadResolverState 647 if cc.sc == nil { 648 // Apply the failing LB only if we haven't received valid service config 649 // from the name resolver in the past. 650 cc.applyFailingLB(s.ServiceConfig) 651 cc.mu.Unlock() 652 return ret 653 } 654 } 655 } 656 657 var balCfg serviceconfig.LoadBalancingConfig 658 if cc.sc != nil && cc.sc.lbConfig != nil { 659 balCfg = cc.sc.lbConfig.cfg 660 } 661 bw := cc.balancerWrapper 662 cc.mu.Unlock() 663 664 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 665 if ret == nil { 666 ret = uccsErr // prefer ErrBadResolver state since any other error is 667 // currently meaningless to the caller. 668 } 669 return ret 670 } 671 672 // applyFailingLB is akin to configuring an LB policy on the channel which 673 // always fails RPCs. Here, an actual LB policy is not configured, but an always 674 // erroring picker is configured, which returns errors with information about 675 // what was invalid in the received service config. A config selector with no 676 // service config is configured, and the connectivity state of the channel is 677 // set to TransientFailure. 678 // 679 // Caller must hold cc.mu. 680 func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { 681 var err error 682 if sc.Err != nil { 683 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) 684 } else { 685 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config) 686 } 687 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) 688 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 689 cc.csMgr.updateState(connectivity.TransientFailure) 690 } 691 692 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 693 cc.balancerWrapper.updateSubConnState(sc, s, err) 694 } 695 696 // newAddrConn creates an addrConn for addrs and adds it to cc.conns. 697 // 698 // Caller needs to make sure len(addrs) > 0. 699 func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 700 ac := &addrConn{ 701 state: connectivity.Idle, 702 cc: cc, 703 addrs: addrs, 704 scopts: opts, 705 dopts: cc.dopts, 706 czData: new(channelzData), 707 resetBackoff: make(chan struct{}), 708 } 709 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 710 // Track ac in cc. This needs to be done before any getTransport(...) is called. 711 cc.mu.Lock() 712 defer cc.mu.Unlock() 713 if cc.conns == nil { 714 return nil, ErrClientConnClosing 715 } 716 717 var err error 718 ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") 719 if err != nil { 720 return nil, err 721 } 722 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 723 Desc: "Subchannel created", 724 Severity: channelz.CtInfo, 725 Parent: &channelz.TraceEventDesc{ 726 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()), 727 Severity: channelz.CtInfo, 728 }, 729 }) 730 731 cc.conns[ac] = struct{}{} 732 return ac, nil 733 } 734 735 // removeAddrConn removes the addrConn in the subConn from clientConn. 736 // It also tears down the ac with the given error. 737 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 738 cc.mu.Lock() 739 if cc.conns == nil { 740 cc.mu.Unlock() 741 return 742 } 743 delete(cc.conns, ac) 744 cc.mu.Unlock() 745 ac.tearDown(err) 746 } 747 748 func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 749 return &channelz.ChannelInternalMetric{ 750 State: cc.GetState(), 751 Target: cc.target, 752 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 753 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 754 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 755 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 756 } 757 } 758 759 // Target returns the target string of the ClientConn. 760 // 761 // # Experimental 762 // 763 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 764 // later release. 765 func (cc *ClientConn) Target() string { 766 return cc.target 767 } 768 769 func (cc *ClientConn) incrCallsStarted() { 770 atomic.AddInt64(&cc.czData.callsStarted, 1) 771 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 772 } 773 774 func (cc *ClientConn) incrCallsSucceeded() { 775 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 776 } 777 778 func (cc *ClientConn) incrCallsFailed() { 779 atomic.AddInt64(&cc.czData.callsFailed, 1) 780 } 781 782 // connect starts creating a transport. 783 // It does nothing if the ac is not IDLE. 784 // TODO(bar) Move this to the addrConn section. 785 func (ac *addrConn) connect() error { 786 ac.mu.Lock() 787 if ac.state == connectivity.Shutdown { 788 if logger.V(2) { 789 logger.Infof("connect called on shutdown addrConn; ignoring.") 790 } 791 ac.mu.Unlock() 792 return errConnClosing 793 } 794 if ac.state != connectivity.Idle { 795 if logger.V(2) { 796 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state) 797 } 798 ac.mu.Unlock() 799 return nil 800 } 801 // Update connectivity state within the lock to prevent subsequent or 802 // concurrent calls from resetting the transport more than once. 803 ac.updateConnectivityState(connectivity.Connecting, nil) 804 ac.mu.Unlock() 805 806 ac.resetTransport() 807 return nil 808 } 809 810 func equalAddresses(a, b []resolver.Address) bool { 811 if len(a) != len(b) { 812 return false 813 } 814 for i, v := range a { 815 if !v.Equal(b[i]) { 816 return false 817 } 818 } 819 return true 820 } 821 822 // tryUpdateAddrs tries to update ac.addrs with the new addresses list. 823 // 824 // If ac is TransientFailure, it updates ac.addrs and returns true. The updated 825 // addresses will be picked up by retry in the next iteration after backoff. 826 // 827 // If ac is Shutdown or Idle, it updates ac.addrs and returns true. 828 // 829 // If the addresses is the same as the old list, it does nothing and returns 830 // true. 831 // 832 // If ac is Connecting, it returns false. The caller should tear down the ac and 833 // create a new one. Note that the backoff will be reset when this happens. 834 // 835 // If ac is Ready, it checks whether current connected address of ac is in the 836 // new addrs list. 837 // - If true, it updates ac.addrs and returns true. The ac will keep using 838 // the existing connection. 839 // - If false, it does nothing and returns false. 840 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 841 ac.mu.Lock() 842 defer ac.mu.Unlock() 843 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 844 if ac.state == connectivity.Shutdown || 845 ac.state == connectivity.TransientFailure || 846 ac.state == connectivity.Idle { 847 ac.addrs = addrs 848 return true 849 } 850 851 if equalAddresses(ac.addrs, addrs) { 852 return true 853 } 854 855 if ac.state == connectivity.Connecting { 856 return false 857 } 858 859 // ac.state is Ready, try to find the connected address. 860 var curAddrFound bool 861 for _, a := range addrs { 862 a.ServerName = ac.cc.getServerName(a) 863 if reflect.DeepEqual(ac.curAddr, a) { 864 curAddrFound = true 865 break 866 } 867 } 868 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 869 if curAddrFound { 870 ac.addrs = addrs 871 } 872 873 return curAddrFound 874 } 875 876 // getServerName determines the serverName to be used in the connection 877 // handshake. The default value for the serverName is the authority on the 878 // ClientConn, which either comes from the user's dial target or through an 879 // authority override specified using the WithAuthority dial option. Name 880 // resolvers can specify a per-address override for the serverName through the 881 // resolver.Address.ServerName field which is used only if the WithAuthority 882 // dial option was not used. The rationale is that per-address authority 883 // overrides specified by the name resolver can represent a security risk, while 884 // an override specified by the user is more dependable since they probably know 885 // what they are doing. 886 func (cc *ClientConn) getServerName(addr resolver.Address) string { 887 if cc.dopts.authority != "" { 888 return cc.dopts.authority 889 } 890 if addr.ServerName != "" { 891 return addr.ServerName 892 } 893 return cc.authority 894 } 895 896 func getMethodConfig(sc *ServiceConfig, method string) MethodConfig { 897 if sc == nil { 898 return MethodConfig{} 899 } 900 if m, ok := sc.Methods[method]; ok { 901 return m 902 } 903 i := strings.LastIndex(method, "/") 904 if m, ok := sc.Methods[method[:i+1]]; ok { 905 return m 906 } 907 return sc.Methods[""] 908 } 909 910 // GetMethodConfig gets the method config of the input method. 911 // If there's an exact match for input method (i.e. /service/method), we return 912 // the corresponding MethodConfig. 913 // If there isn't an exact match for the input method, we look for the service's default 914 // config under the service (i.e /service/) and then for the default for all services (empty string). 915 // 916 // If there is a default MethodConfig for the service, we return it. 917 // Otherwise, we return an empty MethodConfig. 918 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 919 // TODO: Avoid the locking here. 920 cc.mu.RLock() 921 defer cc.mu.RUnlock() 922 return getMethodConfig(cc.sc, method) 923 } 924 925 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 926 cc.mu.RLock() 927 defer cc.mu.RUnlock() 928 if cc.sc == nil { 929 return nil 930 } 931 return cc.sc.healthCheckConfig 932 } 933 934 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { 935 return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 936 Ctx: ctx, 937 FullMethodName: method, 938 }) 939 } 940 941 func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { 942 if sc == nil { 943 // should never reach here. 944 return 945 } 946 cc.sc = sc 947 if configSelector != nil { 948 cc.safeConfigSelector.UpdateConfigSelector(configSelector) 949 } 950 951 if cc.sc.retryThrottling != nil { 952 newThrottler := &retryThrottler{ 953 tokens: cc.sc.retryThrottling.MaxTokens, 954 max: cc.sc.retryThrottling.MaxTokens, 955 thresh: cc.sc.retryThrottling.MaxTokens / 2, 956 ratio: cc.sc.retryThrottling.TokenRatio, 957 } 958 cc.retryThrottler.Store(newThrottler) 959 } else { 960 cc.retryThrottler.Store((*retryThrottler)(nil)) 961 } 962 963 var newBalancerName string 964 if cc.sc != nil && cc.sc.lbConfig != nil { 965 newBalancerName = cc.sc.lbConfig.name 966 } else { 967 var isGRPCLB bool 968 for _, a := range addrs { 969 if a.Type == resolver.GRPCLB { 970 isGRPCLB = true 971 break 972 } 973 } 974 if isGRPCLB { 975 newBalancerName = grpclbName 976 } else if cc.sc != nil && cc.sc.LB != nil { 977 newBalancerName = *cc.sc.LB 978 } else { 979 newBalancerName = PickFirstBalancerName 980 } 981 } 982 cc.balancerWrapper.switchTo(newBalancerName) 983 } 984 985 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 986 cc.mu.RLock() 987 r := cc.resolverWrapper 988 cc.mu.RUnlock() 989 if r == nil { 990 return 991 } 992 go r.resolveNow(o) 993 } 994 995 // ResetConnectBackoff wakes up all subchannels in transient failure and causes 996 // them to attempt another connection immediately. It also resets the backoff 997 // times used for subsequent attempts regardless of the current state. 998 // 999 // In general, this function should not be used. Typical service or network 1000 // outages result in a reasonable client reconnection strategy by default. 1001 // However, if a previously unavailable network becomes available, this may be 1002 // used to trigger an immediate reconnect. 1003 // 1004 // # Experimental 1005 // 1006 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 1007 // later release. 1008 func (cc *ClientConn) ResetConnectBackoff() { 1009 cc.mu.Lock() 1010 conns := cc.conns 1011 cc.mu.Unlock() 1012 for ac := range conns { 1013 ac.resetConnectBackoff() 1014 } 1015 } 1016 1017 // Close tears down the ClientConn and all underlying connections. 1018 func (cc *ClientConn) Close() error { 1019 defer cc.cancel() 1020 1021 cc.mu.Lock() 1022 if cc.conns == nil { 1023 cc.mu.Unlock() 1024 return ErrClientConnClosing 1025 } 1026 conns := cc.conns 1027 cc.conns = nil 1028 cc.csMgr.updateState(connectivity.Shutdown) 1029 1030 rWrapper := cc.resolverWrapper 1031 cc.resolverWrapper = nil 1032 bWrapper := cc.balancerWrapper 1033 cc.mu.Unlock() 1034 1035 // The order of closing matters here since the balancer wrapper assumes the 1036 // picker is closed before it is closed. 1037 cc.blockingpicker.close() 1038 if bWrapper != nil { 1039 bWrapper.close() 1040 } 1041 if rWrapper != nil { 1042 rWrapper.close() 1043 } 1044 1045 for ac := range conns { 1046 ac.tearDown(ErrClientConnClosing) 1047 } 1048 ted := &channelz.TraceEventDesc{ 1049 Desc: "Channel deleted", 1050 Severity: channelz.CtInfo, 1051 } 1052 if cc.dopts.channelzParentID != nil { 1053 ted.Parent = &channelz.TraceEventDesc{ 1054 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), 1055 Severity: channelz.CtInfo, 1056 } 1057 } 1058 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) 1059 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add 1060 // trace reference to the entity being deleted, and thus prevent it from being 1061 // deleted right away. 1062 channelz.RemoveEntry(cc.channelzID) 1063 1064 return nil 1065 } 1066 1067 // addrConn is a network connection to a given address. 1068 type addrConn struct { 1069 ctx context.Context 1070 cancel context.CancelFunc 1071 1072 cc *ClientConn 1073 dopts dialOptions 1074 acbw balancer.SubConn 1075 scopts balancer.NewSubConnOptions 1076 1077 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1078 // health checking may require server to report healthy to set ac to READY), and is reset 1079 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1080 // is received, transport is closed, ac has been torn down). 1081 transport transport.ClientTransport // The current transport. 1082 1083 mu sync.Mutex 1084 curAddr resolver.Address // The current address. 1085 addrs []resolver.Address // All addresses that the resolver resolved to. 1086 1087 // Use updateConnectivityState for updating addrConn's connectivity state. 1088 state connectivity.State 1089 1090 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1091 resetBackoff chan struct{} 1092 1093 channelzID *channelz.Identifier 1094 czData *channelzData 1095 } 1096 1097 // Note: this requires a lock on ac.mu. 1098 func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1099 if ac.state == s { 1100 return 1101 } 1102 ac.state = s 1103 if lastErr == nil { 1104 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) 1105 } else { 1106 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) 1107 } 1108 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1109 } 1110 1111 // adjustParams updates parameters used to create transports upon 1112 // receiving a GoAway. 1113 func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1114 switch r { 1115 case transport.GoAwayTooManyPings: 1116 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1117 ac.cc.mu.Lock() 1118 if v > ac.cc.mkp.Time { 1119 ac.cc.mkp.Time = v 1120 } 1121 ac.cc.mu.Unlock() 1122 } 1123 } 1124 1125 func (ac *addrConn) resetTransport() { 1126 ac.mu.Lock() 1127 if ac.state == connectivity.Shutdown { 1128 ac.mu.Unlock() 1129 return 1130 } 1131 1132 addrs := ac.addrs 1133 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1134 // This will be the duration that dial gets to finish. 1135 dialDuration := minConnectTimeout 1136 if ac.dopts.minConnectTimeout != nil { 1137 dialDuration = ac.dopts.minConnectTimeout() 1138 } 1139 1140 if dialDuration < backoffFor { 1141 // Give dial more time as we keep failing to connect. 1142 dialDuration = backoffFor 1143 } 1144 // We can potentially spend all the time trying the first address, and 1145 // if the server accepts the connection and then hangs, the following 1146 // addresses will never be tried. 1147 // 1148 // The spec doesn't mention what should be done for multiple addresses. 1149 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1150 connectDeadline := time.Now().Add(dialDuration) 1151 1152 ac.updateConnectivityState(connectivity.Connecting, nil) 1153 ac.mu.Unlock() 1154 1155 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { 1156 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1157 // After exhausting all addresses, the addrConn enters 1158 // TRANSIENT_FAILURE. 1159 ac.mu.Lock() 1160 if ac.state == connectivity.Shutdown { 1161 ac.mu.Unlock() 1162 return 1163 } 1164 ac.updateConnectivityState(connectivity.TransientFailure, err) 1165 1166 // Backoff. 1167 b := ac.resetBackoff 1168 ac.mu.Unlock() 1169 1170 timer := time.NewTimer(backoffFor) 1171 select { 1172 case <-timer.C: 1173 ac.mu.Lock() 1174 ac.backoffIdx++ 1175 ac.mu.Unlock() 1176 case <-b: 1177 timer.Stop() 1178 case <-ac.ctx.Done(): 1179 timer.Stop() 1180 return 1181 } 1182 1183 ac.mu.Lock() 1184 if ac.state != connectivity.Shutdown { 1185 ac.updateConnectivityState(connectivity.Idle, err) 1186 } 1187 ac.mu.Unlock() 1188 return 1189 } 1190 // Success; reset backoff. 1191 ac.mu.Lock() 1192 ac.backoffIdx = 0 1193 ac.mu.Unlock() 1194 } 1195 1196 // tryAllAddrs tries to creates a connection to the addresses, and stop when at 1197 // the first successful one. It returns an error if no address was successfully 1198 // connected, or updates ac appropriately with the new transport. 1199 func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { 1200 var firstConnErr error 1201 for _, addr := range addrs { 1202 ac.mu.Lock() 1203 if ac.state == connectivity.Shutdown { 1204 ac.mu.Unlock() 1205 return errConnClosing 1206 } 1207 1208 ac.cc.mu.RLock() 1209 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1210 ac.cc.mu.RUnlock() 1211 1212 copts := ac.dopts.copts 1213 if ac.scopts.CredsBundle != nil { 1214 copts.CredsBundle = ac.scopts.CredsBundle 1215 } 1216 ac.mu.Unlock() 1217 1218 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) 1219 1220 err := ac.createTransport(addr, copts, connectDeadline) 1221 if err == nil { 1222 return nil 1223 } 1224 if firstConnErr == nil { 1225 firstConnErr = err 1226 } 1227 ac.cc.updateConnectionError(err) 1228 } 1229 1230 // Couldn't connect to any address. 1231 return firstConnErr 1232 } 1233 1234 // createTransport creates a connection to addr. It returns an error if the 1235 // address was not successfully connected, or updates ac appropriately with the 1236 // new transport. 1237 func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { 1238 addr.ServerName = ac.cc.getServerName(addr) 1239 hctx, hcancel := context.WithCancel(ac.ctx) 1240 1241 onClose := func(r transport.GoAwayReason) { 1242 ac.mu.Lock() 1243 defer ac.mu.Unlock() 1244 // adjust params based on GoAwayReason 1245 ac.adjustParams(r) 1246 if ac.state == connectivity.Shutdown { 1247 // Already shut down. tearDown() already cleared the transport and 1248 // canceled hctx via ac.ctx, and we expected this connection to be 1249 // closed, so do nothing here. 1250 return 1251 } 1252 hcancel() 1253 if ac.transport == nil { 1254 // We're still connecting to this address, which could error. Do 1255 // not update the connectivity state or resolve; these will happen 1256 // at the end of the tryAllAddrs connection loop in the event of an 1257 // error. 1258 return 1259 } 1260 ac.transport = nil 1261 // Refresh the name resolver on any connection loss. 1262 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1263 // Always go idle and wait for the LB policy to initiate a new 1264 // connection attempt. 1265 ac.updateConnectivityState(connectivity.Idle, nil) 1266 } 1267 1268 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1269 defer cancel() 1270 copts.ChannelzParentID = ac.channelzID 1271 1272 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose) 1273 if err != nil { 1274 if logger.V(2) { 1275 logger.Infof("Creating new client transport to %q: %v", addr, err) 1276 } 1277 // newTr is either nil, or closed. 1278 hcancel() 1279 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err) 1280 return err 1281 } 1282 1283 ac.mu.Lock() 1284 defer ac.mu.Unlock() 1285 if ac.state == connectivity.Shutdown { 1286 // This can happen if the subConn was removed while in `Connecting` 1287 // state. tearDown() would have set the state to `Shutdown`, but 1288 // would not have closed the transport since ac.transport would not 1289 // have been set at that point. 1290 // 1291 // We run this in a goroutine because newTr.Close() calls onClose() 1292 // inline, which requires locking ac.mu. 1293 // 1294 // The error we pass to Close() is immaterial since there are no open 1295 // streams at this point, so no trailers with error details will be sent 1296 // out. We just need to pass a non-nil error. 1297 go newTr.Close(transport.ErrConnClosing) 1298 return nil 1299 } 1300 if hctx.Err() != nil { 1301 // onClose was already called for this connection, but the connection 1302 // was successfully established first. Consider it a success and set 1303 // the new state to Idle. 1304 ac.updateConnectivityState(connectivity.Idle, nil) 1305 return nil 1306 } 1307 ac.curAddr = addr 1308 ac.transport = newTr 1309 ac.startHealthCheck(hctx) // Will set state to READY if appropriate. 1310 return nil 1311 } 1312 1313 // startHealthCheck starts the health checking stream (RPC) to watch the health 1314 // stats of this connection if health checking is requested and configured. 1315 // 1316 // LB channel health checking is enabled when all requirements below are met: 1317 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1318 // 2. internal.HealthCheckFunc is set by importing the grpc/health package 1319 // 3. a service config with non-empty healthCheckConfig field is provided 1320 // 4. the load balancer requests it 1321 // 1322 // It sets addrConn to READY if the health checking stream is not started. 1323 // 1324 // Caller must hold ac.mu. 1325 func (ac *addrConn) startHealthCheck(ctx context.Context) { 1326 var healthcheckManagingState bool 1327 defer func() { 1328 if !healthcheckManagingState { 1329 ac.updateConnectivityState(connectivity.Ready, nil) 1330 } 1331 }() 1332 1333 if ac.cc.dopts.disableHealthCheck { 1334 return 1335 } 1336 healthCheckConfig := ac.cc.healthCheckConfig() 1337 if healthCheckConfig == nil { 1338 return 1339 } 1340 if !ac.scopts.HealthCheckEnabled { 1341 return 1342 } 1343 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1344 if healthCheckFunc == nil { 1345 // The health package is not imported to set health check function. 1346 // 1347 // TODO: add a link to the health check doc in the error message. 1348 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.") 1349 return 1350 } 1351 1352 healthcheckManagingState = true 1353 1354 // Set up the health check helper functions. 1355 currentTr := ac.transport 1356 newStream := func(method string) (interface{}, error) { 1357 ac.mu.Lock() 1358 if ac.transport != currentTr { 1359 ac.mu.Unlock() 1360 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1361 } 1362 ac.mu.Unlock() 1363 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1364 } 1365 setConnectivityState := func(s connectivity.State, lastErr error) { 1366 ac.mu.Lock() 1367 defer ac.mu.Unlock() 1368 if ac.transport != currentTr { 1369 return 1370 } 1371 ac.updateConnectivityState(s, lastErr) 1372 } 1373 // Start the health checking stream. 1374 go func() { 1375 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1376 if err != nil { 1377 if status.Code(err) == codes.Unimplemented { 1378 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") 1379 } else { 1380 channelz.Errorf(logger, ac.channelzID, "Health checking failed: %v", err) 1381 } 1382 } 1383 }() 1384 } 1385 1386 func (ac *addrConn) resetConnectBackoff() { 1387 ac.mu.Lock() 1388 close(ac.resetBackoff) 1389 ac.backoffIdx = 0 1390 ac.resetBackoff = make(chan struct{}) 1391 ac.mu.Unlock() 1392 } 1393 1394 // getReadyTransport returns the transport if ac's state is READY or nil if not. 1395 func (ac *addrConn) getReadyTransport() transport.ClientTransport { 1396 ac.mu.Lock() 1397 defer ac.mu.Unlock() 1398 if ac.state == connectivity.Ready { 1399 return ac.transport 1400 } 1401 return nil 1402 } 1403 1404 // tearDown starts to tear down the addrConn. 1405 // 1406 // Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct 1407 // will leak. In most cases, call cc.removeAddrConn() instead. 1408 func (ac *addrConn) tearDown(err error) { 1409 ac.mu.Lock() 1410 if ac.state == connectivity.Shutdown { 1411 ac.mu.Unlock() 1412 return 1413 } 1414 curTr := ac.transport 1415 ac.transport = nil 1416 // We have to set the state to Shutdown before anything else to prevent races 1417 // between setting the state and logic that waits on context cancellation / etc. 1418 ac.updateConnectivityState(connectivity.Shutdown, nil) 1419 ac.cancel() 1420 ac.curAddr = resolver.Address{} 1421 if err == errConnDrain && curTr != nil { 1422 // GracefulClose(...) may be executed multiple times when 1423 // i) receiving multiple GoAway frames from the server; or 1424 // ii) there are concurrent name resolver/Balancer triggered 1425 // address removal and GoAway. 1426 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1427 ac.mu.Unlock() 1428 curTr.GracefulClose() 1429 ac.mu.Lock() 1430 } 1431 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 1432 Desc: "Subchannel deleted", 1433 Severity: channelz.CtInfo, 1434 Parent: &channelz.TraceEventDesc{ 1435 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()), 1436 Severity: channelz.CtInfo, 1437 }, 1438 }) 1439 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add 1440 // trace reference to the entity being deleted, and thus prevent it from 1441 // being deleted right away. 1442 channelz.RemoveEntry(ac.channelzID) 1443 ac.mu.Unlock() 1444 } 1445 1446 func (ac *addrConn) getState() connectivity.State { 1447 ac.mu.Lock() 1448 defer ac.mu.Unlock() 1449 return ac.state 1450 } 1451 1452 func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1453 ac.mu.Lock() 1454 addr := ac.curAddr.Addr 1455 ac.mu.Unlock() 1456 return &channelz.ChannelInternalMetric{ 1457 State: ac.getState(), 1458 Target: addr, 1459 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1460 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1461 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1462 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1463 } 1464 } 1465 1466 func (ac *addrConn) incrCallsStarted() { 1467 atomic.AddInt64(&ac.czData.callsStarted, 1) 1468 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1469 } 1470 1471 func (ac *addrConn) incrCallsSucceeded() { 1472 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1473 } 1474 1475 func (ac *addrConn) incrCallsFailed() { 1476 atomic.AddInt64(&ac.czData.callsFailed, 1) 1477 } 1478 1479 type retryThrottler struct { 1480 max float64 1481 thresh float64 1482 ratio float64 1483 1484 mu sync.Mutex 1485 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1486 } 1487 1488 // throttle subtracts a retry token from the pool and returns whether a retry 1489 // should be throttled (disallowed) based upon the retry throttling policy in 1490 // the service config. 1491 func (rt *retryThrottler) throttle() bool { 1492 if rt == nil { 1493 return false 1494 } 1495 rt.mu.Lock() 1496 defer rt.mu.Unlock() 1497 rt.tokens-- 1498 if rt.tokens < 0 { 1499 rt.tokens = 0 1500 } 1501 return rt.tokens <= rt.thresh 1502 } 1503 1504 func (rt *retryThrottler) successfulRPC() { 1505 if rt == nil { 1506 return 1507 } 1508 rt.mu.Lock() 1509 defer rt.mu.Unlock() 1510 rt.tokens += rt.ratio 1511 if rt.tokens > rt.max { 1512 rt.tokens = rt.max 1513 } 1514 } 1515 1516 type channelzChannel struct { 1517 cc *ClientConn 1518 } 1519 1520 func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1521 return c.cc.channelzMetric() 1522 } 1523 1524 // ErrClientConnTimeout indicates that the ClientConn cannot establish the 1525 // underlying connections within the specified timeout. 1526 // 1527 // Deprecated: This error is never returned by grpc and should not be 1528 // referenced by users. 1529 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1530 1531 // getResolver finds the scheme in the cc's resolvers or the global registry. 1532 // scheme should always be lowercase (typically by virtue of url.Parse() 1533 // performing proper RFC3986 behavior). 1534 func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1535 for _, rb := range cc.dopts.resolvers { 1536 if scheme == rb.Scheme() { 1537 return rb 1538 } 1539 } 1540 return resolver.Get(scheme) 1541 } 1542 1543 func (cc *ClientConn) updateConnectionError(err error) { 1544 cc.lceMu.Lock() 1545 cc.lastConnectionError = err 1546 cc.lceMu.Unlock() 1547 } 1548 1549 func (cc *ClientConn) connectionError() error { 1550 cc.lceMu.Lock() 1551 defer cc.lceMu.Unlock() 1552 return cc.lastConnectionError 1553 } 1554 1555 func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) { 1556 channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target) 1557 1558 var rb resolver.Builder 1559 parsedTarget, err := parseTarget(cc.target) 1560 if err != nil { 1561 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) 1562 } else { 1563 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) 1564 rb = cc.getResolver(parsedTarget.URL.Scheme) 1565 if rb != nil { 1566 cc.parsedTarget = parsedTarget 1567 return rb, nil 1568 } 1569 } 1570 1571 // We are here because the user's dial target did not contain a scheme or 1572 // specified an unregistered scheme. We should fallback to the default 1573 // scheme, except when a custom dialer is specified in which case, we should 1574 // always use passthrough scheme. 1575 defScheme := resolver.GetDefaultScheme() 1576 channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme) 1577 canonicalTarget := defScheme + ":///" + cc.target 1578 1579 parsedTarget, err = parseTarget(canonicalTarget) 1580 if err != nil { 1581 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err) 1582 return nil, err 1583 } 1584 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) 1585 rb = cc.getResolver(parsedTarget.URL.Scheme) 1586 if rb == nil { 1587 return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme) 1588 } 1589 cc.parsedTarget = parsedTarget 1590 return rb, nil 1591 } 1592 1593 // parseTarget uses RFC 3986 semantics to parse the given target into a 1594 // resolver.Target struct containing scheme, authority and url. Query 1595 // params are stripped from the endpoint. 1596 func parseTarget(target string) (resolver.Target, error) { 1597 u, err := url.Parse(target) 1598 if err != nil { 1599 return resolver.Target{}, err 1600 } 1601 1602 return resolver.Target{ 1603 Scheme: u.Scheme, 1604 Authority: u.Host, 1605 URL: *u, 1606 }, nil 1607 } 1608 1609 // Determine channel authority. The order of precedence is as follows: 1610 // - user specified authority override using `WithAuthority` dial option 1611 // - creds' notion of server name for the authentication handshake 1612 // - endpoint from dial target of the form "scheme://[authority]/endpoint" 1613 func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) { 1614 // Historically, we had two options for users to specify the serverName or 1615 // authority for a channel. One was through the transport credentials 1616 // (either in its constructor, or through the OverrideServerName() method). 1617 // The other option (for cases where WithInsecure() dial option was used) 1618 // was to use the WithAuthority() dial option. 1619 // 1620 // A few things have changed since: 1621 // - `insecure` package with an implementation of the `TransportCredentials` 1622 // interface for the insecure case 1623 // - WithAuthority() dial option support for secure credentials 1624 authorityFromCreds := "" 1625 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" { 1626 authorityFromCreds = creds.Info().ServerName 1627 } 1628 authorityFromDialOption := dopts.authority 1629 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption { 1630 return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption) 1631 } 1632 1633 switch { 1634 case authorityFromDialOption != "": 1635 return authorityFromDialOption, nil 1636 case authorityFromCreds != "": 1637 return authorityFromCreds, nil 1638 case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"): 1639 // TODO: remove when the unix resolver implements optional interface to 1640 // return channel authority. 1641 return "localhost", nil 1642 case strings.HasPrefix(endpoint, ":"): 1643 return "localhost" + endpoint, nil 1644 default: 1645 // TODO: Define an optional interface on the resolver builder to return 1646 // the channel authority given the user's dial target. For resolvers 1647 // which don't implement this interface, we will use the endpoint from 1648 // "scheme://authority/endpoint" as the default authority. 1649 return endpoint, nil 1650 } 1651 }