resolver_conn_wrapper.go (5696B)
1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "strings" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/credentials" 27 "google.golang.org/grpc/internal/channelz" 28 "google.golang.org/grpc/internal/grpcsync" 29 "google.golang.org/grpc/internal/pretty" 30 "google.golang.org/grpc/resolver" 31 "google.golang.org/grpc/serviceconfig" 32 ) 33 34 // ccResolverWrapper is a wrapper on top of cc for resolvers. 35 // It implements resolver.ClientConn interface. 36 type ccResolverWrapper struct { 37 cc *ClientConn 38 resolverMu sync.Mutex 39 resolver resolver.Resolver 40 done *grpcsync.Event 41 curState resolver.State 42 43 incomingMu sync.Mutex // Synchronizes all the incoming calls. 44 } 45 46 // newCCResolverWrapper uses the resolver.Builder to build a Resolver and 47 // returns a ccResolverWrapper object which wraps the newly built resolver. 48 func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) { 49 ccr := &ccResolverWrapper{ 50 cc: cc, 51 done: grpcsync.NewEvent(), 52 } 53 54 var credsClone credentials.TransportCredentials 55 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 56 credsClone = creds.Clone() 57 } 58 rbo := resolver.BuildOptions{ 59 DisableServiceConfig: cc.dopts.disableServiceConfig, 60 DialCreds: credsClone, 61 CredsBundle: cc.dopts.copts.CredsBundle, 62 Dialer: cc.dopts.copts.Dialer, 63 } 64 65 var err error 66 // We need to hold the lock here while we assign to the ccr.resolver field 67 // to guard against a data race caused by the following code path, 68 // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up 69 // accessing ccr.resolver which is being assigned here. 70 ccr.resolverMu.Lock() 71 defer ccr.resolverMu.Unlock() 72 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo) 73 if err != nil { 74 return nil, err 75 } 76 return ccr, nil 77 } 78 79 func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { 80 ccr.resolverMu.Lock() 81 if !ccr.done.HasFired() { 82 ccr.resolver.ResolveNow(o) 83 } 84 ccr.resolverMu.Unlock() 85 } 86 87 func (ccr *ccResolverWrapper) close() { 88 ccr.resolverMu.Lock() 89 ccr.resolver.Close() 90 ccr.done.Fire() 91 ccr.resolverMu.Unlock() 92 } 93 94 func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { 95 ccr.incomingMu.Lock() 96 defer ccr.incomingMu.Unlock() 97 if ccr.done.HasFired() { 98 return nil 99 } 100 ccr.addChannelzTraceEvent(s) 101 ccr.curState = s 102 if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { 103 return balancer.ErrBadResolverState 104 } 105 return nil 106 } 107 108 func (ccr *ccResolverWrapper) ReportError(err error) { 109 ccr.incomingMu.Lock() 110 defer ccr.incomingMu.Unlock() 111 if ccr.done.HasFired() { 112 return 113 } 114 channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) 115 ccr.cc.updateResolverState(resolver.State{}, err) 116 } 117 118 // NewAddress is called by the resolver implementation to send addresses to gRPC. 119 func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { 120 ccr.incomingMu.Lock() 121 defer ccr.incomingMu.Unlock() 122 if ccr.done.HasFired() { 123 return 124 } 125 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) 126 ccr.curState.Addresses = addrs 127 ccr.cc.updateResolverState(ccr.curState, nil) 128 } 129 130 // NewServiceConfig is called by the resolver implementation to send service 131 // configs to gRPC. 132 func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { 133 ccr.incomingMu.Lock() 134 defer ccr.incomingMu.Unlock() 135 if ccr.done.HasFired() { 136 return 137 } 138 channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc) 139 if ccr.cc.dopts.disableServiceConfig { 140 channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config") 141 return 142 } 143 scpr := parseServiceConfig(sc) 144 if scpr.Err != nil { 145 channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) 146 return 147 } 148 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) 149 ccr.curState.ServiceConfig = scpr 150 ccr.cc.updateResolverState(ccr.curState, nil) 151 } 152 153 func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { 154 return parseServiceConfig(scJSON) 155 } 156 157 func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { 158 var updates []string 159 var oldSC, newSC *ServiceConfig 160 var oldOK, newOK bool 161 if ccr.curState.ServiceConfig != nil { 162 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig) 163 } 164 if s.ServiceConfig != nil { 165 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig) 166 } 167 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) { 168 updates = append(updates, "service config updated") 169 } 170 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { 171 updates = append(updates, "resolver returned an empty address list") 172 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { 173 updates = append(updates, "resolver returned new addresses") 174 } 175 channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) 176 }