balancer.go (8208B)
1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package base 20 21 import ( 22 "errors" 23 "fmt" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/grpclog" 28 "google.golang.org/grpc/resolver" 29 ) 30 31 var logger = grpclog.Component("balancer") 32 33 type baseBuilder struct { 34 name string 35 pickerBuilder PickerBuilder 36 config Config 37 } 38 39 func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 40 bal := &baseBalancer{ 41 cc: cc, 42 pickerBuilder: bb.pickerBuilder, 43 44 subConns: resolver.NewAddressMap(), 45 scStates: make(map[balancer.SubConn]connectivity.State), 46 csEvltr: &balancer.ConnectivityStateEvaluator{}, 47 config: bb.config, 48 state: connectivity.Connecting, 49 } 50 // Initialize picker to a picker that always returns 51 // ErrNoSubConnAvailable, because when state of a SubConn changes, we 52 // may call UpdateState with this picker. 53 bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable) 54 return bal 55 } 56 57 func (bb *baseBuilder) Name() string { 58 return bb.name 59 } 60 61 type baseBalancer struct { 62 cc balancer.ClientConn 63 pickerBuilder PickerBuilder 64 65 csEvltr *balancer.ConnectivityStateEvaluator 66 state connectivity.State 67 68 subConns *resolver.AddressMap 69 scStates map[balancer.SubConn]connectivity.State 70 picker balancer.Picker 71 config Config 72 73 resolverErr error // the last error reported by the resolver; cleared on successful resolution 74 connErr error // the last connection error; cleared upon leaving TransientFailure 75 } 76 77 func (b *baseBalancer) ResolverError(err error) { 78 b.resolverErr = err 79 if b.subConns.Len() == 0 { 80 b.state = connectivity.TransientFailure 81 } 82 83 if b.state != connectivity.TransientFailure { 84 // The picker will not change since the balancer does not currently 85 // report an error. 86 return 87 } 88 b.regeneratePicker() 89 b.cc.UpdateState(balancer.State{ 90 ConnectivityState: b.state, 91 Picker: b.picker, 92 }) 93 } 94 95 func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { 96 // TODO: handle s.ResolverState.ServiceConfig? 97 if logger.V(2) { 98 logger.Info("base.baseBalancer: got new ClientConn state: ", s) 99 } 100 // Successful resolution; clear resolver error and ensure we return nil. 101 b.resolverErr = nil 102 // addrsSet is the set converted from addrs, it's used for quick lookup of an address. 103 addrsSet := resolver.NewAddressMap() 104 for _, a := range s.ResolverState.Addresses { 105 addrsSet.Set(a, nil) 106 if _, ok := b.subConns.Get(a); !ok { 107 // a is a new address (not existing in b.subConns). 108 sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) 109 if err != nil { 110 logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) 111 continue 112 } 113 b.subConns.Set(a, sc) 114 b.scStates[sc] = connectivity.Idle 115 b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle) 116 sc.Connect() 117 } 118 } 119 for _, a := range b.subConns.Keys() { 120 sci, _ := b.subConns.Get(a) 121 sc := sci.(balancer.SubConn) 122 // a was removed by resolver. 123 if _, ok := addrsSet.Get(a); !ok { 124 b.cc.RemoveSubConn(sc) 125 b.subConns.Delete(a) 126 // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. 127 // The entry will be deleted in UpdateSubConnState. 128 } 129 } 130 // If resolver state contains no addresses, return an error so ClientConn 131 // will trigger re-resolve. Also records this as an resolver error, so when 132 // the overall state turns transient failure, the error message will have 133 // the zero address information. 134 if len(s.ResolverState.Addresses) == 0 { 135 b.ResolverError(errors.New("produced zero addresses")) 136 return balancer.ErrBadResolverState 137 } 138 139 b.regeneratePicker() 140 b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) 141 return nil 142 } 143 144 // mergeErrors builds an error from the last connection error and the last 145 // resolver error. Must only be called if b.state is TransientFailure. 146 func (b *baseBalancer) mergeErrors() error { 147 // connErr must always be non-nil unless there are no SubConns, in which 148 // case resolverErr must be non-nil. 149 if b.connErr == nil { 150 return fmt.Errorf("last resolver error: %v", b.resolverErr) 151 } 152 if b.resolverErr == nil { 153 return fmt.Errorf("last connection error: %v", b.connErr) 154 } 155 return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr) 156 } 157 158 // regeneratePicker takes a snapshot of the balancer, and generates a picker 159 // from it. The picker is 160 // - errPicker if the balancer is in TransientFailure, 161 // - built by the pickerBuilder with all READY SubConns otherwise. 162 func (b *baseBalancer) regeneratePicker() { 163 if b.state == connectivity.TransientFailure { 164 b.picker = NewErrPicker(b.mergeErrors()) 165 return 166 } 167 readySCs := make(map[balancer.SubConn]SubConnInfo) 168 169 // Filter out all ready SCs from full subConn map. 170 for _, addr := range b.subConns.Keys() { 171 sci, _ := b.subConns.Get(addr) 172 sc := sci.(balancer.SubConn) 173 if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { 174 readySCs[sc] = SubConnInfo{Address: addr} 175 } 176 } 177 b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) 178 } 179 180 func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 181 s := state.ConnectivityState 182 if logger.V(2) { 183 logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) 184 } 185 oldS, ok := b.scStates[sc] 186 if !ok { 187 if logger.V(2) { 188 logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) 189 } 190 return 191 } 192 if oldS == connectivity.TransientFailure && 193 (s == connectivity.Connecting || s == connectivity.Idle) { 194 // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or 195 // CONNECTING transitions to prevent the aggregated state from being 196 // always CONNECTING when many backends exist but are all down. 197 if s == connectivity.Idle { 198 sc.Connect() 199 } 200 return 201 } 202 b.scStates[sc] = s 203 switch s { 204 case connectivity.Idle: 205 sc.Connect() 206 case connectivity.Shutdown: 207 // When an address was removed by resolver, b called RemoveSubConn but 208 // kept the sc's state in scStates. Remove state for this sc here. 209 delete(b.scStates, sc) 210 case connectivity.TransientFailure: 211 // Save error to be reported via picker. 212 b.connErr = state.ConnectionError 213 } 214 215 b.state = b.csEvltr.RecordTransition(oldS, s) 216 217 // Regenerate picker when one of the following happens: 218 // - this sc entered or left ready 219 // - the aggregated state of balancer is TransientFailure 220 // (may need to update error message) 221 if (s == connectivity.Ready) != (oldS == connectivity.Ready) || 222 b.state == connectivity.TransientFailure { 223 b.regeneratePicker() 224 } 225 b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) 226 } 227 228 // Close is a nop because base balancer doesn't have internal state to clean up, 229 // and it doesn't need to call RemoveSubConn for the SubConns. 230 func (b *baseBalancer) Close() { 231 } 232 233 // ExitIdle is a nop because the base balancer attempts to stay connected to 234 // all SubConns at all times. 235 func (b *baseBalancer) ExitIdle() { 236 } 237 238 // NewErrPicker returns a Picker that always returns err on Pick(). 239 func NewErrPicker(err error) balancer.Picker { 240 return &errPicker{err: err} 241 } 242 243 // NewErrPickerV2 is temporarily defined for backward compatibility reasons. 244 // 245 // Deprecated: use NewErrPicker instead. 246 var NewErrPickerV2 = NewErrPicker 247 248 type errPicker struct { 249 err error // Pick() always returns this err. 250 } 251 252 func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { 253 return balancer.PickResult{}, p.err 254 }