gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

gracefulswitch.go (13536B)


      1 /*
      2  *
      3  * Copyright 2022 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 // Package gracefulswitch implements a graceful switch load balancer.
     20 package gracefulswitch
     21 
     22 import (
     23 	"errors"
     24 	"fmt"
     25 	"sync"
     26 
     27 	"google.golang.org/grpc/balancer"
     28 	"google.golang.org/grpc/balancer/base"
     29 	"google.golang.org/grpc/connectivity"
     30 	"google.golang.org/grpc/resolver"
     31 )
     32 
     33 var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
     34 var _ balancer.Balancer = (*Balancer)(nil)
     35 
     36 // NewBalancer returns a graceful switch Balancer.
     37 func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
     38 	return &Balancer{
     39 		cc:    cc,
     40 		bOpts: opts,
     41 	}
     42 }
     43 
     44 // Balancer is a utility to gracefully switch from one balancer to
     45 // a new balancer. It implements the balancer.Balancer interface.
     46 type Balancer struct {
     47 	bOpts balancer.BuildOptions
     48 	cc    balancer.ClientConn
     49 
     50 	// mu protects the following fields and all fields within balancerCurrent
     51 	// and balancerPending. mu does not need to be held when calling into the
     52 	// child balancers, as all calls into these children happen only as a direct
     53 	// result of a call into the gracefulSwitchBalancer, which are also
     54 	// guaranteed to be synchronous. There is one exception: an UpdateState call
     55 	// from a child balancer when current and pending are populated can lead to
     56 	// calling Close() on the current. To prevent that racing with an
     57 	// UpdateSubConnState from the channel, we hold currentMu during Close and
     58 	// UpdateSubConnState calls.
     59 	mu              sync.Mutex
     60 	balancerCurrent *balancerWrapper
     61 	balancerPending *balancerWrapper
     62 	closed          bool // set to true when this balancer is closed
     63 
     64 	// currentMu must be locked before mu. This mutex guards against this
     65 	// sequence of events: UpdateSubConnState() called, finds the
     66 	// balancerCurrent, gives up lock, updateState comes in, causes Close() on
     67 	// balancerCurrent before the UpdateSubConnState is called on the
     68 	// balancerCurrent.
     69 	currentMu sync.Mutex
     70 }
     71 
     72 // swap swaps out the current lb with the pending lb and updates the ClientConn.
     73 // The caller must hold gsb.mu.
     74 func (gsb *Balancer) swap() {
     75 	gsb.cc.UpdateState(gsb.balancerPending.lastState)
     76 	cur := gsb.balancerCurrent
     77 	gsb.balancerCurrent = gsb.balancerPending
     78 	gsb.balancerPending = nil
     79 	go func() {
     80 		gsb.currentMu.Lock()
     81 		defer gsb.currentMu.Unlock()
     82 		cur.Close()
     83 	}()
     84 }
     85 
     86 // Helper function that checks if the balancer passed in is current or pending.
     87 // The caller must hold gsb.mu.
     88 func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
     89 	return bw == gsb.balancerCurrent || bw == gsb.balancerPending
     90 }
     91 
     92 // SwitchTo initializes the graceful switch process, which completes based on
     93 // connectivity state changes on the current/pending balancer. Thus, the switch
     94 // process is not complete when this method returns. This method must be called
     95 // synchronously alongside the rest of the balancer.Balancer methods this
     96 // Graceful Switch Balancer implements.
     97 func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
     98 	gsb.mu.Lock()
     99 	if gsb.closed {
    100 		gsb.mu.Unlock()
    101 		return errBalancerClosed
    102 	}
    103 	bw := &balancerWrapper{
    104 		gsb: gsb,
    105 		lastState: balancer.State{
    106 			ConnectivityState: connectivity.Connecting,
    107 			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
    108 		},
    109 		subconns: make(map[balancer.SubConn]bool),
    110 	}
    111 	balToClose := gsb.balancerPending // nil if there is no pending balancer
    112 	if gsb.balancerCurrent == nil {
    113 		gsb.balancerCurrent = bw
    114 	} else {
    115 		gsb.balancerPending = bw
    116 	}
    117 	gsb.mu.Unlock()
    118 	balToClose.Close()
    119 	// This function takes a builder instead of a balancer because builder.Build
    120 	// can call back inline, and this utility needs to handle the callbacks.
    121 	newBalancer := builder.Build(bw, gsb.bOpts)
    122 	if newBalancer == nil {
    123 		// This is illegal and should never happen; we clear the balancerWrapper
    124 		// we were constructing if it happens to avoid a potential panic.
    125 		gsb.mu.Lock()
    126 		if gsb.balancerPending != nil {
    127 			gsb.balancerPending = nil
    128 		} else {
    129 			gsb.balancerCurrent = nil
    130 		}
    131 		gsb.mu.Unlock()
    132 		return balancer.ErrBadResolverState
    133 	}
    134 
    135 	// This write doesn't need to take gsb.mu because this field never gets read
    136 	// or written to on any calls from the current or pending. Calls from grpc
    137 	// to this balancer are guaranteed to be called synchronously, so this
    138 	// bw.Balancer field will never be forwarded to until this SwitchTo()
    139 	// function returns.
    140 	bw.Balancer = newBalancer
    141 	return nil
    142 }
    143 
    144 // Returns nil if the graceful switch balancer is closed.
    145 func (gsb *Balancer) latestBalancer() *balancerWrapper {
    146 	gsb.mu.Lock()
    147 	defer gsb.mu.Unlock()
    148 	if gsb.balancerPending != nil {
    149 		return gsb.balancerPending
    150 	}
    151 	return gsb.balancerCurrent
    152 }
    153 
    154 // UpdateClientConnState forwards the update to the latest balancer created.
    155 func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
    156 	// The resolver data is only relevant to the most recent LB Policy.
    157 	balToUpdate := gsb.latestBalancer()
    158 	if balToUpdate == nil {
    159 		return errBalancerClosed
    160 	}
    161 	// Perform this call without gsb.mu to prevent deadlocks if the child calls
    162 	// back into the channel. The latest balancer can never be closed during a
    163 	// call from the channel, even without gsb.mu held.
    164 	return balToUpdate.UpdateClientConnState(state)
    165 }
    166 
    167 // ResolverError forwards the error to the latest balancer created.
    168 func (gsb *Balancer) ResolverError(err error) {
    169 	// The resolver data is only relevant to the most recent LB Policy.
    170 	balToUpdate := gsb.latestBalancer()
    171 	if balToUpdate == nil {
    172 		return
    173 	}
    174 	// Perform this call without gsb.mu to prevent deadlocks if the child calls
    175 	// back into the channel. The latest balancer can never be closed during a
    176 	// call from the channel, even without gsb.mu held.
    177 	balToUpdate.ResolverError(err)
    178 }
    179 
    180 // ExitIdle forwards the call to the latest balancer created.
    181 //
    182 // If the latest balancer does not support ExitIdle, the subConns are
    183 // re-connected to manually.
    184 func (gsb *Balancer) ExitIdle() {
    185 	balToUpdate := gsb.latestBalancer()
    186 	if balToUpdate == nil {
    187 		return
    188 	}
    189 	// There is no need to protect this read with a mutex, as the write to the
    190 	// Balancer field happens in SwitchTo, which completes before this can be
    191 	// called.
    192 	if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
    193 		ei.ExitIdle()
    194 		return
    195 	}
    196 	gsb.mu.Lock()
    197 	defer gsb.mu.Unlock()
    198 	for sc := range balToUpdate.subconns {
    199 		sc.Connect()
    200 	}
    201 }
    202 
    203 // UpdateSubConnState forwards the update to the appropriate child.
    204 func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    205 	gsb.currentMu.Lock()
    206 	defer gsb.currentMu.Unlock()
    207 	gsb.mu.Lock()
    208 	// Forward update to the appropriate child.  Even if there is a pending
    209 	// balancer, the current balancer should continue to get SubConn updates to
    210 	// maintain the proper state while the pending is still connecting.
    211 	var balToUpdate *balancerWrapper
    212 	if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
    213 		balToUpdate = gsb.balancerCurrent
    214 	} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
    215 		balToUpdate = gsb.balancerPending
    216 	}
    217 	gsb.mu.Unlock()
    218 	if balToUpdate == nil {
    219 		// SubConn belonged to a stale lb policy that has not yet fully closed,
    220 		// or the balancer was already closed.
    221 		return
    222 	}
    223 	balToUpdate.UpdateSubConnState(sc, state)
    224 }
    225 
    226 // Close closes any active child balancers.
    227 func (gsb *Balancer) Close() {
    228 	gsb.mu.Lock()
    229 	gsb.closed = true
    230 	currentBalancerToClose := gsb.balancerCurrent
    231 	gsb.balancerCurrent = nil
    232 	pendingBalancerToClose := gsb.balancerPending
    233 	gsb.balancerPending = nil
    234 	gsb.mu.Unlock()
    235 
    236 	currentBalancerToClose.Close()
    237 	pendingBalancerToClose.Close()
    238 }
    239 
    240 // balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
    241 // methods to help cleanup SubConns created by the wrapped balancer.
    242 //
    243 // It implements the balancer.ClientConn interface and is passed down in that
    244 // capacity to the wrapped balancer. It maintains a set of subConns created by
    245 // the wrapped balancer and calls from the latter to create/update/remove
    246 // SubConns update this set before being forwarded to the parent ClientConn.
    247 // State updates from the wrapped balancer can result in invocation of the
    248 // graceful switch logic.
    249 type balancerWrapper struct {
    250 	balancer.Balancer
    251 	gsb *Balancer
    252 
    253 	lastState balancer.State
    254 	subconns  map[balancer.SubConn]bool // subconns created by this balancer
    255 }
    256 
    257 func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    258 	if state.ConnectivityState == connectivity.Shutdown {
    259 		bw.gsb.mu.Lock()
    260 		delete(bw.subconns, sc)
    261 		bw.gsb.mu.Unlock()
    262 	}
    263 	// There is no need to protect this read with a mutex, as the write to the
    264 	// Balancer field happens in SwitchTo, which completes before this can be
    265 	// called.
    266 	bw.Balancer.UpdateSubConnState(sc, state)
    267 }
    268 
    269 // Close closes the underlying LB policy and removes the subconns it created. bw
    270 // must not be referenced via balancerCurrent or balancerPending in gsb when
    271 // called. gsb.mu must not be held.  Does not panic with a nil receiver.
    272 func (bw *balancerWrapper) Close() {
    273 	// before Close is called.
    274 	if bw == nil {
    275 		return
    276 	}
    277 	// There is no need to protect this read with a mutex, as Close() is
    278 	// impossible to be called concurrently with the write in SwitchTo(). The
    279 	// callsites of Close() for this balancer in Graceful Switch Balancer will
    280 	// never be called until SwitchTo() returns.
    281 	bw.Balancer.Close()
    282 	bw.gsb.mu.Lock()
    283 	for sc := range bw.subconns {
    284 		bw.gsb.cc.RemoveSubConn(sc)
    285 	}
    286 	bw.gsb.mu.Unlock()
    287 }
    288 
    289 func (bw *balancerWrapper) UpdateState(state balancer.State) {
    290 	// Hold the mutex for this entire call to ensure it cannot occur
    291 	// concurrently with other updateState() calls. This causes updates to
    292 	// lastState and calls to cc.UpdateState to happen atomically.
    293 	bw.gsb.mu.Lock()
    294 	defer bw.gsb.mu.Unlock()
    295 	bw.lastState = state
    296 
    297 	if !bw.gsb.balancerCurrentOrPending(bw) {
    298 		return
    299 	}
    300 
    301 	if bw == bw.gsb.balancerCurrent {
    302 		// In the case that the current balancer exits READY, and there is a pending
    303 		// balancer, you can forward the pending balancer's cached State up to
    304 		// ClientConn and swap the pending into the current. This is because there
    305 		// is no reason to gracefully switch from and keep using the old policy as
    306 		// the ClientConn is not connected to any backends.
    307 		if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
    308 			bw.gsb.swap()
    309 			return
    310 		}
    311 		// Even if there is a pending balancer waiting to be gracefully switched to,
    312 		// continue to forward current balancer updates to the Client Conn. Ignoring
    313 		// state + picker from the current would cause undefined behavior/cause the
    314 		// system to behave incorrectly from the current LB policies perspective.
    315 		// Also, the current LB is still being used by grpc to choose SubConns per
    316 		// RPC, and thus should use the most updated form of the current balancer.
    317 		bw.gsb.cc.UpdateState(state)
    318 		return
    319 	}
    320 	// This method is now dealing with a state update from the pending balancer.
    321 	// If the current balancer is currently in a state other than READY, the new
    322 	// policy can be swapped into place immediately. This is because there is no
    323 	// reason to gracefully switch from and keep using the old policy as the
    324 	// ClientConn is not connected to any backends.
    325 	if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
    326 		bw.gsb.swap()
    327 	}
    328 }
    329 
    330 func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
    331 	bw.gsb.mu.Lock()
    332 	if !bw.gsb.balancerCurrentOrPending(bw) {
    333 		bw.gsb.mu.Unlock()
    334 		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
    335 	}
    336 	bw.gsb.mu.Unlock()
    337 
    338 	sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
    339 	if err != nil {
    340 		return nil, err
    341 	}
    342 	bw.gsb.mu.Lock()
    343 	if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
    344 		bw.gsb.cc.RemoveSubConn(sc)
    345 		bw.gsb.mu.Unlock()
    346 		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
    347 	}
    348 	bw.subconns[sc] = true
    349 	bw.gsb.mu.Unlock()
    350 	return sc, nil
    351 }
    352 
    353 func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
    354 	// Ignore ResolveNow requests from anything other than the most recent
    355 	// balancer, because older balancers were already removed from the config.
    356 	if bw != bw.gsb.latestBalancer() {
    357 		return
    358 	}
    359 	bw.gsb.cc.ResolveNow(opts)
    360 }
    361 
    362 func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
    363 	bw.gsb.mu.Lock()
    364 	if !bw.gsb.balancerCurrentOrPending(bw) {
    365 		bw.gsb.mu.Unlock()
    366 		return
    367 	}
    368 	bw.gsb.mu.Unlock()
    369 	bw.gsb.cc.RemoveSubConn(sc)
    370 }
    371 
    372 func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
    373 	bw.gsb.mu.Lock()
    374 	if !bw.gsb.balancerCurrentOrPending(bw) {
    375 		bw.gsb.mu.Unlock()
    376 		return
    377 	}
    378 	bw.gsb.mu.Unlock()
    379 	bw.gsb.cc.UpdateAddresses(sc, addrs)
    380 }
    381 
    382 func (bw *balancerWrapper) Target() string {
    383 	return bw.gsb.cc.Target()
    384 }