balancer_conn_wrappers.go (16309B)
1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "context" 23 "fmt" 24 "strings" 25 "sync" 26 27 "google.golang.org/grpc/balancer" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/connectivity" 30 "google.golang.org/grpc/internal/balancer/gracefulswitch" 31 "google.golang.org/grpc/internal/buffer" 32 "google.golang.org/grpc/internal/channelz" 33 "google.golang.org/grpc/internal/grpcsync" 34 "google.golang.org/grpc/resolver" 35 "google.golang.org/grpc/status" 36 ) 37 38 // ccBalancerWrapper sits between the ClientConn and the Balancer. 39 // 40 // ccBalancerWrapper implements methods corresponding to the ones on the 41 // balancer.Balancer interface. The ClientConn is free to call these methods 42 // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn 43 // to the Balancer happen synchronously and in order. 44 // 45 // ccBalancerWrapper also implements the balancer.ClientConn interface and is 46 // passed to the Balancer implementations. It invokes unexported methods on the 47 // ClientConn to handle these calls from the Balancer. 48 // 49 // It uses the gracefulswitch.Balancer internally to ensure that balancer 50 // switches happen in a graceful manner. 51 type ccBalancerWrapper struct { 52 cc *ClientConn 53 54 // Since these fields are accessed only from handleXxx() methods which are 55 // synchronized by the watcher goroutine, we do not need a mutex to protect 56 // these fields. 57 balancer *gracefulswitch.Balancer 58 curBalancerName string 59 60 updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher(). 61 resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here. 62 closed *grpcsync.Event // Indicates if close has been called. 63 done *grpcsync.Event // Indicates if close has completed its work. 64 } 65 66 // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer 67 // is not created until the switchTo() method is invoked. 68 func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { 69 ccb := &ccBalancerWrapper{ 70 cc: cc, 71 updateCh: buffer.NewUnbounded(), 72 resultCh: buffer.NewUnbounded(), 73 closed: grpcsync.NewEvent(), 74 done: grpcsync.NewEvent(), 75 } 76 go ccb.watcher() 77 ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) 78 return ccb 79 } 80 81 // The following xxxUpdate structs wrap the arguments received as part of the 82 // corresponding update. The watcher goroutine uses the 'type' of the update to 83 // invoke the appropriate handler routine to handle the update. 84 85 type ccStateUpdate struct { 86 ccs *balancer.ClientConnState 87 } 88 89 type scStateUpdate struct { 90 sc balancer.SubConn 91 state connectivity.State 92 err error 93 } 94 95 type exitIdleUpdate struct{} 96 97 type resolverErrorUpdate struct { 98 err error 99 } 100 101 type switchToUpdate struct { 102 name string 103 } 104 105 type subConnUpdate struct { 106 acbw *acBalancerWrapper 107 } 108 109 // watcher is a long-running goroutine which reads updates from a channel and 110 // invokes corresponding methods on the underlying balancer. It ensures that 111 // these methods are invoked in a synchronous fashion. It also ensures that 112 // these methods are invoked in the order in which the updates were received. 113 func (ccb *ccBalancerWrapper) watcher() { 114 for { 115 select { 116 case u := <-ccb.updateCh.Get(): 117 ccb.updateCh.Load() 118 if ccb.closed.HasFired() { 119 break 120 } 121 switch update := u.(type) { 122 case *ccStateUpdate: 123 ccb.handleClientConnStateChange(update.ccs) 124 case *scStateUpdate: 125 ccb.handleSubConnStateChange(update) 126 case *exitIdleUpdate: 127 ccb.handleExitIdle() 128 case *resolverErrorUpdate: 129 ccb.handleResolverError(update.err) 130 case *switchToUpdate: 131 ccb.handleSwitchTo(update.name) 132 case *subConnUpdate: 133 ccb.handleRemoveSubConn(update.acbw) 134 default: 135 logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update) 136 } 137 case <-ccb.closed.Done(): 138 } 139 140 if ccb.closed.HasFired() { 141 ccb.handleClose() 142 return 143 } 144 } 145 } 146 147 // updateClientConnState is invoked by grpc to push a ClientConnState update to 148 // the underlying balancer. 149 // 150 // Unlike other methods invoked by grpc to push updates to the underlying 151 // balancer, this method cannot simply push the update onto the update channel 152 // and return. It needs to return the error returned by the underlying balancer 153 // back to grpc which propagates that to the resolver. 154 func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { 155 ccb.updateCh.Put(&ccStateUpdate{ccs: ccs}) 156 157 var res interface{} 158 select { 159 case res = <-ccb.resultCh.Get(): 160 ccb.resultCh.Load() 161 case <-ccb.closed.Done(): 162 // Return early if the balancer wrapper is closed while we are waiting for 163 // the underlying balancer to process a ClientConnState update. 164 return nil 165 } 166 // If the returned error is nil, attempting to type assert to error leads to 167 // panic. So, this needs to handled separately. 168 if res == nil { 169 return nil 170 } 171 return res.(error) 172 } 173 174 // handleClientConnStateChange handles a ClientConnState update from the update 175 // channel and invokes the appropriate method on the underlying balancer. 176 // 177 // If the addresses specified in the update contain addresses of type "grpclb" 178 // and the selected LB policy is not "grpclb", these addresses will be filtered 179 // out and ccs will be modified with the updated address list. 180 func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) { 181 if ccb.curBalancerName != grpclbName { 182 // Filter any grpclb addresses since we don't have the grpclb balancer. 183 var addrs []resolver.Address 184 for _, addr := range ccs.ResolverState.Addresses { 185 if addr.Type == resolver.GRPCLB { 186 continue 187 } 188 addrs = append(addrs, addr) 189 } 190 ccs.ResolverState.Addresses = addrs 191 } 192 ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs)) 193 } 194 195 // updateSubConnState is invoked by grpc to push a subConn state update to the 196 // underlying balancer. 197 func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { 198 // When updating addresses for a SubConn, if the address in use is not in 199 // the new addresses, the old ac will be tearDown() and a new ac will be 200 // created. tearDown() generates a state change with Shutdown state, we 201 // don't want the balancer to receive this state change. So before 202 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 203 // this function will be called with (nil, Shutdown). We don't need to call 204 // balancer method in this case. 205 if sc == nil { 206 return 207 } 208 ccb.updateCh.Put(&scStateUpdate{ 209 sc: sc, 210 state: s, 211 err: err, 212 }) 213 } 214 215 // handleSubConnStateChange handles a SubConnState update from the update 216 // channel and invokes the appropriate method on the underlying balancer. 217 func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { 218 ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err}) 219 } 220 221 func (ccb *ccBalancerWrapper) exitIdle() { 222 ccb.updateCh.Put(&exitIdleUpdate{}) 223 } 224 225 func (ccb *ccBalancerWrapper) handleExitIdle() { 226 if ccb.cc.GetState() != connectivity.Idle { 227 return 228 } 229 ccb.balancer.ExitIdle() 230 } 231 232 func (ccb *ccBalancerWrapper) resolverError(err error) { 233 ccb.updateCh.Put(&resolverErrorUpdate{err: err}) 234 } 235 236 func (ccb *ccBalancerWrapper) handleResolverError(err error) { 237 ccb.balancer.ResolverError(err) 238 } 239 240 // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the 241 // LB policy identified by name. 242 // 243 // ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the 244 // first good update from the name resolver, it determines the LB policy to use 245 // and invokes the switchTo() method. Upon receipt of every subsequent update 246 // from the name resolver, it invokes this method. 247 // 248 // the ccBalancerWrapper keeps track of the current LB policy name, and skips 249 // the graceful balancer switching process if the name does not change. 250 func (ccb *ccBalancerWrapper) switchTo(name string) { 251 ccb.updateCh.Put(&switchToUpdate{name: name}) 252 } 253 254 // handleSwitchTo handles a balancer switch update from the update channel. It 255 // calls the SwitchTo() method on the gracefulswitch.Balancer with a 256 // balancer.Builder corresponding to name. If no balancer.Builder is registered 257 // for the given name, it uses the default LB policy which is "pick_first". 258 func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { 259 // TODO: Other languages use case-insensitive balancer registries. We should 260 // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. 261 if strings.EqualFold(ccb.curBalancerName, name) { 262 return 263 } 264 265 // TODO: Ensure that name is a registered LB policy when we get here. 266 // We currently only validate the `loadBalancingConfig` field. We need to do 267 // the same for the `loadBalancingPolicy` field and reject the service config 268 // if the specified policy is not registered. 269 builder := balancer.Get(name) 270 if builder == nil { 271 channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) 272 builder = newPickfirstBuilder() 273 } else { 274 channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) 275 } 276 277 if err := ccb.balancer.SwitchTo(builder); err != nil { 278 channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) 279 return 280 } 281 ccb.curBalancerName = builder.Name() 282 } 283 284 // handleRemoveSucConn handles a request from the underlying balancer to remove 285 // a subConn. 286 // 287 // See comments in RemoveSubConn() for more details. 288 func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) { 289 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 290 } 291 292 func (ccb *ccBalancerWrapper) close() { 293 ccb.closed.Fire() 294 <-ccb.done.Done() 295 } 296 297 func (ccb *ccBalancerWrapper) handleClose() { 298 ccb.balancer.Close() 299 ccb.done.Fire() 300 } 301 302 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 303 if len(addrs) <= 0 { 304 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 305 } 306 ac, err := ccb.cc.newAddrConn(addrs, opts) 307 if err != nil { 308 channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) 309 return nil, err 310 } 311 acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} 312 acbw.ac.mu.Lock() 313 ac.acbw = acbw 314 acbw.ac.mu.Unlock() 315 return acbw, nil 316 } 317 318 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 319 // Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it 320 // was required to handle the RemoveSubConn() method asynchronously by pushing 321 // the update onto the update channel. This was done to avoid a deadlock as 322 // switchBalancer() was holding cc.mu when calling Close() on the old 323 // balancer, which would in turn call RemoveSubConn(). 324 // 325 // With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this 326 // asynchronously is probably not required anymore since the switchTo() method 327 // handles the balancer switch by pushing the update onto the channel. 328 // TODO(easwars): Handle this inline. 329 acbw, ok := sc.(*acBalancerWrapper) 330 if !ok { 331 return 332 } 333 ccb.updateCh.Put(&subConnUpdate{acbw: acbw}) 334 } 335 336 func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { 337 acbw, ok := sc.(*acBalancerWrapper) 338 if !ok { 339 return 340 } 341 acbw.UpdateAddresses(addrs) 342 } 343 344 func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { 345 // Update picker before updating state. Even though the ordering here does 346 // not matter, it can lead to multiple calls of Pick in the common start-up 347 // case where we wait for ready and then perform an RPC. If the picker is 348 // updated later, we could call the "connecting" picker when the state is 349 // updated, and then call the "ready" picker after the picker gets updated. 350 ccb.cc.blockingpicker.updatePicker(s.Picker) 351 ccb.cc.csMgr.updateState(s.ConnectivityState) 352 } 353 354 func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { 355 ccb.cc.resolveNow(o) 356 } 357 358 func (ccb *ccBalancerWrapper) Target() string { 359 return ccb.cc.target 360 } 361 362 // acBalancerWrapper is a wrapper on top of ac for balancers. 363 // It implements balancer.SubConn interface. 364 type acBalancerWrapper struct { 365 mu sync.Mutex 366 ac *addrConn 367 producers map[balancer.ProducerBuilder]*refCountedProducer 368 } 369 370 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 371 acbw.mu.Lock() 372 defer acbw.mu.Unlock() 373 if len(addrs) <= 0 { 374 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) 375 return 376 } 377 if !acbw.ac.tryUpdateAddrs(addrs) { 378 cc := acbw.ac.cc 379 opts := acbw.ac.scopts 380 acbw.ac.mu.Lock() 381 // Set old ac.acbw to nil so the Shutdown state update will be ignored 382 // by balancer. 383 // 384 // TODO(bar) the state transition could be wrong when tearDown() old ac 385 // and creating new ac, fix the transition. 386 acbw.ac.acbw = nil 387 acbw.ac.mu.Unlock() 388 acState := acbw.ac.getState() 389 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) 390 391 if acState == connectivity.Shutdown { 392 return 393 } 394 395 newAC, err := cc.newAddrConn(addrs, opts) 396 if err != nil { 397 channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 398 return 399 } 400 acbw.ac = newAC 401 newAC.mu.Lock() 402 newAC.acbw = acbw 403 newAC.mu.Unlock() 404 if acState != connectivity.Idle { 405 go newAC.connect() 406 } 407 } 408 } 409 410 func (acbw *acBalancerWrapper) Connect() { 411 acbw.mu.Lock() 412 defer acbw.mu.Unlock() 413 go acbw.ac.connect() 414 } 415 416 func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 417 acbw.mu.Lock() 418 defer acbw.mu.Unlock() 419 return acbw.ac 420 } 421 422 var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected") 423 424 // NewStream begins a streaming RPC on the addrConn. If the addrConn is not 425 // ready, returns errSubConnNotReady. 426 func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 427 transport := acbw.ac.getReadyTransport() 428 if transport == nil { 429 return nil, errSubConnNotReady 430 } 431 return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) 432 } 433 434 // Invoke performs a unary RPC. If the addrConn is not ready, returns 435 // errSubConnNotReady. 436 func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { 437 cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) 438 if err != nil { 439 return err 440 } 441 if err := cs.SendMsg(args); err != nil { 442 return err 443 } 444 return cs.RecvMsg(reply) 445 } 446 447 type refCountedProducer struct { 448 producer balancer.Producer 449 refs int // number of current refs to the producer 450 close func() // underlying producer's close function 451 } 452 453 func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { 454 acbw.mu.Lock() 455 defer acbw.mu.Unlock() 456 457 // Look up existing producer from this builder. 458 pData := acbw.producers[pb] 459 if pData == nil { 460 // Not found; create a new one and add it to the producers map. 461 p, close := pb.Build(acbw) 462 pData = &refCountedProducer{producer: p, close: close} 463 acbw.producers[pb] = pData 464 } 465 // Account for this new reference. 466 pData.refs++ 467 468 // Return a cleanup function wrapped in a OnceFunc to remove this reference 469 // and delete the refCountedProducer from the map if the total reference 470 // count goes to zero. 471 unref := func() { 472 acbw.mu.Lock() 473 pData.refs-- 474 if pData.refs == 0 { 475 defer pData.close() // Run outside the acbw mutex 476 delete(acbw.producers, pb) 477 } 478 acbw.mu.Unlock() 479 } 480 return pData.producer, grpcsync.OnceFunc(unref) 481 }