funcs.go (21739B)
1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package channelz defines APIs for enabling channelz service, entry 20 // registration/deletion, and accessing channelz data. It also defines channelz 21 // metric struct formats. 22 // 23 // All APIs in this package are experimental. 24 package channelz 25 26 import ( 27 "context" 28 "errors" 29 "fmt" 30 "sort" 31 "sync" 32 "sync/atomic" 33 "time" 34 35 "google.golang.org/grpc/grpclog" 36 ) 37 38 const ( 39 defaultMaxTraceEntry int32 = 30 40 ) 41 42 var ( 43 db dbWrapper 44 idGen idGenerator 45 // EntryPerPage defines the number of channelz entries to be shown on a web page. 46 EntryPerPage = int64(50) 47 curState int32 48 maxTraceEntry = defaultMaxTraceEntry 49 ) 50 51 // TurnOn turns on channelz data collection. 52 func TurnOn() { 53 if !IsOn() { 54 db.set(newChannelMap()) 55 idGen.reset() 56 atomic.StoreInt32(&curState, 1) 57 } 58 } 59 60 // IsOn returns whether channelz data collection is on. 61 func IsOn() bool { 62 return atomic.CompareAndSwapInt32(&curState, 1, 1) 63 } 64 65 // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). 66 // Setting it to 0 will disable channel tracing. 67 func SetMaxTraceEntry(i int32) { 68 atomic.StoreInt32(&maxTraceEntry, i) 69 } 70 71 // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default. 72 func ResetMaxTraceEntryToDefault() { 73 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) 74 } 75 76 func getMaxTraceEntry() int { 77 i := atomic.LoadInt32(&maxTraceEntry) 78 return int(i) 79 } 80 81 // dbWarpper wraps around a reference to internal channelz data storage, and 82 // provide synchronized functionality to set and get the reference. 83 type dbWrapper struct { 84 mu sync.RWMutex 85 DB *channelMap 86 } 87 88 func (d *dbWrapper) set(db *channelMap) { 89 d.mu.Lock() 90 d.DB = db 91 d.mu.Unlock() 92 } 93 94 func (d *dbWrapper) get() *channelMap { 95 d.mu.RLock() 96 defer d.mu.RUnlock() 97 return d.DB 98 } 99 100 // NewChannelzStorageForTesting initializes channelz data storage and id 101 // generator for testing purposes. 102 // 103 // Returns a cleanup function to be invoked by the test, which waits for up to 104 // 10s for all channelz state to be reset by the grpc goroutines when those 105 // entities get closed. This cleanup function helps with ensuring that tests 106 // don't mess up each other. 107 func NewChannelzStorageForTesting() (cleanup func() error) { 108 db.set(newChannelMap()) 109 idGen.reset() 110 111 return func() error { 112 cm := db.get() 113 if cm == nil { 114 return nil 115 } 116 117 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 118 defer cancel() 119 ticker := time.NewTicker(10 * time.Millisecond) 120 defer ticker.Stop() 121 for { 122 cm.mu.RLock() 123 topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets) 124 cm.mu.RUnlock() 125 126 if err := ctx.Err(); err != nil { 127 return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets) 128 } 129 if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 { 130 return nil 131 } 132 <-ticker.C 133 } 134 } 135 } 136 137 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a 138 // boolean indicating whether there's more top channels to be queried for. 139 // 140 // The arg id specifies that only top channel with id at or above it will be included 141 // in the result. The returned slice is up to a length of the arg maxResults or 142 // EntryPerPage if maxResults is zero, and is sorted in ascending id order. 143 func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { 144 return db.get().GetTopChannels(id, maxResults) 145 } 146 147 // GetServers returns a slice of server's ServerMetric, along with a 148 // boolean indicating whether there's more servers to be queried for. 149 // 150 // The arg id specifies that only server with id at or above it will be included 151 // in the result. The returned slice is up to a length of the arg maxResults or 152 // EntryPerPage if maxResults is zero, and is sorted in ascending id order. 153 func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) { 154 return db.get().GetServers(id, maxResults) 155 } 156 157 // GetServerSockets returns a slice of server's (identified by id) normal socket's 158 // SocketMetric, along with a boolean indicating whether there's more sockets to 159 // be queried for. 160 // 161 // The arg startID specifies that only sockets with id at or above it will be 162 // included in the result. The returned slice is up to a length of the arg maxResults 163 // or EntryPerPage if maxResults is zero, and is sorted in ascending id order. 164 func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { 165 return db.get().GetServerSockets(id, startID, maxResults) 166 } 167 168 // GetChannel returns the ChannelMetric for the channel (identified by id). 169 func GetChannel(id int64) *ChannelMetric { 170 return db.get().GetChannel(id) 171 } 172 173 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). 174 func GetSubChannel(id int64) *SubChannelMetric { 175 return db.get().GetSubChannel(id) 176 } 177 178 // GetSocket returns the SocketInternalMetric for the socket (identified by id). 179 func GetSocket(id int64) *SocketMetric { 180 return db.get().GetSocket(id) 181 } 182 183 // GetServer returns the ServerMetric for the server (identified by id). 184 func GetServer(id int64) *ServerMetric { 185 return db.get().GetServer(id) 186 } 187 188 // RegisterChannel registers the given channel c in the channelz database with 189 // ref as its reference name, and adds it to the child list of its parent 190 // (identified by pid). pid == nil means no parent. 191 // 192 // Returns a unique channelz identifier assigned to this channel. 193 // 194 // If channelz is not turned ON, the channelz database is not mutated. 195 func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier { 196 id := idGen.genID() 197 var parent int64 198 isTopChannel := true 199 if pid != nil { 200 isTopChannel = false 201 parent = pid.Int() 202 } 203 204 if !IsOn() { 205 return newIdentifer(RefChannel, id, pid) 206 } 207 208 cn := &channel{ 209 refName: ref, 210 c: c, 211 subChans: make(map[int64]string), 212 nestedChans: make(map[int64]string), 213 id: id, 214 pid: parent, 215 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, 216 } 217 db.get().addChannel(id, cn, isTopChannel, parent) 218 return newIdentifer(RefChannel, id, pid) 219 } 220 221 // RegisterSubChannel registers the given subChannel c in the channelz database 222 // with ref as its reference name, and adds it to the child list of its parent 223 // (identified by pid). 224 // 225 // Returns a unique channelz identifier assigned to this subChannel. 226 // 227 // If channelz is not turned ON, the channelz database is not mutated. 228 func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) { 229 if pid == nil { 230 return nil, errors.New("a SubChannel's parent id cannot be nil") 231 } 232 id := idGen.genID() 233 if !IsOn() { 234 return newIdentifer(RefSubChannel, id, pid), nil 235 } 236 237 sc := &subChannel{ 238 refName: ref, 239 c: c, 240 sockets: make(map[int64]string), 241 id: id, 242 pid: pid.Int(), 243 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, 244 } 245 db.get().addSubChannel(id, sc, pid.Int()) 246 return newIdentifer(RefSubChannel, id, pid), nil 247 } 248 249 // RegisterServer registers the given server s in channelz database. It returns 250 // the unique channelz tracking id assigned to this server. 251 // 252 // If channelz is not turned ON, the channelz database is not mutated. 253 func RegisterServer(s Server, ref string) *Identifier { 254 id := idGen.genID() 255 if !IsOn() { 256 return newIdentifer(RefServer, id, nil) 257 } 258 259 svr := &server{ 260 refName: ref, 261 s: s, 262 sockets: make(map[int64]string), 263 listenSockets: make(map[int64]string), 264 id: id, 265 } 266 db.get().addServer(id, svr) 267 return newIdentifer(RefServer, id, nil) 268 } 269 270 // RegisterListenSocket registers the given listen socket s in channelz database 271 // with ref as its reference name, and add it to the child list of its parent 272 // (identified by pid). It returns the unique channelz tracking id assigned to 273 // this listen socket. 274 // 275 // If channelz is not turned ON, the channelz database is not mutated. 276 func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { 277 if pid == nil { 278 return nil, errors.New("a ListenSocket's parent id cannot be 0") 279 } 280 id := idGen.genID() 281 if !IsOn() { 282 return newIdentifer(RefListenSocket, id, pid), nil 283 } 284 285 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()} 286 db.get().addListenSocket(id, ls, pid.Int()) 287 return newIdentifer(RefListenSocket, id, pid), nil 288 } 289 290 // RegisterNormalSocket registers the given normal socket s in channelz database 291 // with ref as its reference name, and adds it to the child list of its parent 292 // (identified by pid). It returns the unique channelz tracking id assigned to 293 // this normal socket. 294 // 295 // If channelz is not turned ON, the channelz database is not mutated. 296 func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { 297 if pid == nil { 298 return nil, errors.New("a NormalSocket's parent id cannot be 0") 299 } 300 id := idGen.genID() 301 if !IsOn() { 302 return newIdentifer(RefNormalSocket, id, pid), nil 303 } 304 305 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()} 306 db.get().addNormalSocket(id, ns, pid.Int()) 307 return newIdentifer(RefNormalSocket, id, pid), nil 308 } 309 310 // RemoveEntry removes an entry with unique channelz tracking id to be id from 311 // channelz database. 312 // 313 // If channelz is not turned ON, this function is a no-op. 314 func RemoveEntry(id *Identifier) { 315 if !IsOn() { 316 return 317 } 318 db.get().removeEntry(id.Int()) 319 } 320 321 // TraceEventDesc is what the caller of AddTraceEvent should provide to describe 322 // the event to be added to the channel trace. 323 // 324 // The Parent field is optional. It is used for an event that will be recorded 325 // in the entity's parent trace. 326 type TraceEventDesc struct { 327 Desc string 328 Severity Severity 329 Parent *TraceEventDesc 330 } 331 332 // AddTraceEvent adds trace related to the entity with specified id, using the 333 // provided TraceEventDesc. 334 // 335 // If channelz is not turned ON, this will simply log the event descriptions. 336 func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) { 337 // Log only the trace description associated with the bottom most entity. 338 switch desc.Severity { 339 case CtUnknown, CtInfo: 340 l.InfoDepth(depth+1, withParens(id)+desc.Desc) 341 case CtWarning: 342 l.WarningDepth(depth+1, withParens(id)+desc.Desc) 343 case CtError: 344 l.ErrorDepth(depth+1, withParens(id)+desc.Desc) 345 } 346 347 if getMaxTraceEntry() == 0 { 348 return 349 } 350 if IsOn() { 351 db.get().traceEvent(id.Int(), desc) 352 } 353 } 354 355 // channelMap is the storage data structure for channelz. 356 // Methods of channelMap can be divided in two two categories with respect to locking. 357 // 1. Methods acquire the global lock. 358 // 2. Methods that can only be called when global lock is held. 359 // A second type of method need always to be called inside a first type of method. 360 type channelMap struct { 361 mu sync.RWMutex 362 topLevelChannels map[int64]struct{} 363 servers map[int64]*server 364 channels map[int64]*channel 365 subChannels map[int64]*subChannel 366 listenSockets map[int64]*listenSocket 367 normalSockets map[int64]*normalSocket 368 } 369 370 func newChannelMap() *channelMap { 371 return &channelMap{ 372 topLevelChannels: make(map[int64]struct{}), 373 channels: make(map[int64]*channel), 374 listenSockets: make(map[int64]*listenSocket), 375 normalSockets: make(map[int64]*normalSocket), 376 servers: make(map[int64]*server), 377 subChannels: make(map[int64]*subChannel), 378 } 379 } 380 381 func (c *channelMap) addServer(id int64, s *server) { 382 c.mu.Lock() 383 s.cm = c 384 c.servers[id] = s 385 c.mu.Unlock() 386 } 387 388 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) { 389 c.mu.Lock() 390 cn.cm = c 391 cn.trace.cm = c 392 c.channels[id] = cn 393 if isTopChannel { 394 c.topLevelChannels[id] = struct{}{} 395 } else { 396 c.findEntry(pid).addChild(id, cn) 397 } 398 c.mu.Unlock() 399 } 400 401 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) { 402 c.mu.Lock() 403 sc.cm = c 404 sc.trace.cm = c 405 c.subChannels[id] = sc 406 c.findEntry(pid).addChild(id, sc) 407 c.mu.Unlock() 408 } 409 410 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) { 411 c.mu.Lock() 412 ls.cm = c 413 c.listenSockets[id] = ls 414 c.findEntry(pid).addChild(id, ls) 415 c.mu.Unlock() 416 } 417 418 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) { 419 c.mu.Lock() 420 ns.cm = c 421 c.normalSockets[id] = ns 422 c.findEntry(pid).addChild(id, ns) 423 c.mu.Unlock() 424 } 425 426 // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to 427 // wait on the deletion of its children and until no other entity's channel trace references it. 428 // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully 429 // shutting down server will lead to the server being also deleted. 430 func (c *channelMap) removeEntry(id int64) { 431 c.mu.Lock() 432 c.findEntry(id).triggerDelete() 433 c.mu.Unlock() 434 } 435 436 // c.mu must be held by the caller 437 func (c *channelMap) decrTraceRefCount(id int64) { 438 e := c.findEntry(id) 439 if v, ok := e.(tracedChannel); ok { 440 v.decrTraceRefCount() 441 e.deleteSelfIfReady() 442 } 443 } 444 445 // c.mu must be held by the caller. 446 func (c *channelMap) findEntry(id int64) entry { 447 var v entry 448 var ok bool 449 if v, ok = c.channels[id]; ok { 450 return v 451 } 452 if v, ok = c.subChannels[id]; ok { 453 return v 454 } 455 if v, ok = c.servers[id]; ok { 456 return v 457 } 458 if v, ok = c.listenSockets[id]; ok { 459 return v 460 } 461 if v, ok = c.normalSockets[id]; ok { 462 return v 463 } 464 return &dummyEntry{idNotFound: id} 465 } 466 467 // c.mu must be held by the caller 468 // deleteEntry simply deletes an entry from the channelMap. Before calling this 469 // method, caller must check this entry is ready to be deleted, i.e removeEntry() 470 // has been called on it, and no children still exist. 471 // Conditionals are ordered by the expected frequency of deletion of each entity 472 // type, in order to optimize performance. 473 func (c *channelMap) deleteEntry(id int64) { 474 var ok bool 475 if _, ok = c.normalSockets[id]; ok { 476 delete(c.normalSockets, id) 477 return 478 } 479 if _, ok = c.subChannels[id]; ok { 480 delete(c.subChannels, id) 481 return 482 } 483 if _, ok = c.channels[id]; ok { 484 delete(c.channels, id) 485 delete(c.topLevelChannels, id) 486 return 487 } 488 if _, ok = c.listenSockets[id]; ok { 489 delete(c.listenSockets, id) 490 return 491 } 492 if _, ok = c.servers[id]; ok { 493 delete(c.servers, id) 494 return 495 } 496 } 497 498 func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) { 499 c.mu.Lock() 500 child := c.findEntry(id) 501 childTC, ok := child.(tracedChannel) 502 if !ok { 503 c.mu.Unlock() 504 return 505 } 506 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) 507 if desc.Parent != nil { 508 parent := c.findEntry(child.getParentID()) 509 var chanType RefChannelType 510 switch child.(type) { 511 case *channel: 512 chanType = RefChannel 513 case *subChannel: 514 chanType = RefSubChannel 515 } 516 if parentTC, ok := parent.(tracedChannel); ok { 517 parentTC.getChannelTrace().append(&TraceEvent{ 518 Desc: desc.Parent.Desc, 519 Severity: desc.Parent.Severity, 520 Timestamp: time.Now(), 521 RefID: id, 522 RefName: childTC.getRefName(), 523 RefType: chanType, 524 }) 525 childTC.incrTraceRefCount() 526 } 527 } 528 c.mu.Unlock() 529 } 530 531 type int64Slice []int64 532 533 func (s int64Slice) Len() int { return len(s) } 534 func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 535 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } 536 537 func copyMap(m map[int64]string) map[int64]string { 538 n := make(map[int64]string) 539 for k, v := range m { 540 n[k] = v 541 } 542 return n 543 } 544 545 func min(a, b int64) int64 { 546 if a < b { 547 return a 548 } 549 return b 550 } 551 552 func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { 553 if maxResults <= 0 { 554 maxResults = EntryPerPage 555 } 556 c.mu.RLock() 557 l := int64(len(c.topLevelChannels)) 558 ids := make([]int64, 0, l) 559 cns := make([]*channel, 0, min(l, maxResults)) 560 561 for k := range c.topLevelChannels { 562 ids = append(ids, k) 563 } 564 sort.Sort(int64Slice(ids)) 565 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 566 count := int64(0) 567 var end bool 568 var t []*ChannelMetric 569 for i, v := range ids[idx:] { 570 if count == maxResults { 571 break 572 } 573 if cn, ok := c.channels[v]; ok { 574 cns = append(cns, cn) 575 t = append(t, &ChannelMetric{ 576 NestedChans: copyMap(cn.nestedChans), 577 SubChans: copyMap(cn.subChans), 578 }) 579 count++ 580 } 581 if i == len(ids[idx:])-1 { 582 end = true 583 break 584 } 585 } 586 c.mu.RUnlock() 587 if count == 0 { 588 end = true 589 } 590 591 for i, cn := range cns { 592 t[i].ChannelData = cn.c.ChannelzMetric() 593 t[i].ID = cn.id 594 t[i].RefName = cn.refName 595 t[i].Trace = cn.trace.dumpData() 596 } 597 return t, end 598 } 599 600 func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) { 601 if maxResults <= 0 { 602 maxResults = EntryPerPage 603 } 604 c.mu.RLock() 605 l := int64(len(c.servers)) 606 ids := make([]int64, 0, l) 607 ss := make([]*server, 0, min(l, maxResults)) 608 for k := range c.servers { 609 ids = append(ids, k) 610 } 611 sort.Sort(int64Slice(ids)) 612 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 613 count := int64(0) 614 var end bool 615 var s []*ServerMetric 616 for i, v := range ids[idx:] { 617 if count == maxResults { 618 break 619 } 620 if svr, ok := c.servers[v]; ok { 621 ss = append(ss, svr) 622 s = append(s, &ServerMetric{ 623 ListenSockets: copyMap(svr.listenSockets), 624 }) 625 count++ 626 } 627 if i == len(ids[idx:])-1 { 628 end = true 629 break 630 } 631 } 632 c.mu.RUnlock() 633 if count == 0 { 634 end = true 635 } 636 637 for i, svr := range ss { 638 s[i].ServerData = svr.s.ChannelzMetric() 639 s[i].ID = svr.id 640 s[i].RefName = svr.refName 641 } 642 return s, end 643 } 644 645 func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { 646 if maxResults <= 0 { 647 maxResults = EntryPerPage 648 } 649 var svr *server 650 var ok bool 651 c.mu.RLock() 652 if svr, ok = c.servers[id]; !ok { 653 // server with id doesn't exist. 654 c.mu.RUnlock() 655 return nil, true 656 } 657 svrskts := svr.sockets 658 l := int64(len(svrskts)) 659 ids := make([]int64, 0, l) 660 sks := make([]*normalSocket, 0, min(l, maxResults)) 661 for k := range svrskts { 662 ids = append(ids, k) 663 } 664 sort.Sort(int64Slice(ids)) 665 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID }) 666 count := int64(0) 667 var end bool 668 for i, v := range ids[idx:] { 669 if count == maxResults { 670 break 671 } 672 if ns, ok := c.normalSockets[v]; ok { 673 sks = append(sks, ns) 674 count++ 675 } 676 if i == len(ids[idx:])-1 { 677 end = true 678 break 679 } 680 } 681 c.mu.RUnlock() 682 if count == 0 { 683 end = true 684 } 685 s := make([]*SocketMetric, 0, len(sks)) 686 for _, ns := range sks { 687 sm := &SocketMetric{} 688 sm.SocketData = ns.s.ChannelzMetric() 689 sm.ID = ns.id 690 sm.RefName = ns.refName 691 s = append(s, sm) 692 } 693 return s, end 694 } 695 696 func (c *channelMap) GetChannel(id int64) *ChannelMetric { 697 cm := &ChannelMetric{} 698 var cn *channel 699 var ok bool 700 c.mu.RLock() 701 if cn, ok = c.channels[id]; !ok { 702 // channel with id doesn't exist. 703 c.mu.RUnlock() 704 return nil 705 } 706 cm.NestedChans = copyMap(cn.nestedChans) 707 cm.SubChans = copyMap(cn.subChans) 708 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when 709 // holding the lock to prevent potential data race. 710 chanCopy := cn.c 711 c.mu.RUnlock() 712 cm.ChannelData = chanCopy.ChannelzMetric() 713 cm.ID = cn.id 714 cm.RefName = cn.refName 715 cm.Trace = cn.trace.dumpData() 716 return cm 717 } 718 719 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { 720 cm := &SubChannelMetric{} 721 var sc *subChannel 722 var ok bool 723 c.mu.RLock() 724 if sc, ok = c.subChannels[id]; !ok { 725 // subchannel with id doesn't exist. 726 c.mu.RUnlock() 727 return nil 728 } 729 cm.Sockets = copyMap(sc.sockets) 730 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when 731 // holding the lock to prevent potential data race. 732 chanCopy := sc.c 733 c.mu.RUnlock() 734 cm.ChannelData = chanCopy.ChannelzMetric() 735 cm.ID = sc.id 736 cm.RefName = sc.refName 737 cm.Trace = sc.trace.dumpData() 738 return cm 739 } 740 741 func (c *channelMap) GetSocket(id int64) *SocketMetric { 742 sm := &SocketMetric{} 743 c.mu.RLock() 744 if ls, ok := c.listenSockets[id]; ok { 745 c.mu.RUnlock() 746 sm.SocketData = ls.s.ChannelzMetric() 747 sm.ID = ls.id 748 sm.RefName = ls.refName 749 return sm 750 } 751 if ns, ok := c.normalSockets[id]; ok { 752 c.mu.RUnlock() 753 sm.SocketData = ns.s.ChannelzMetric() 754 sm.ID = ns.id 755 sm.RefName = ns.refName 756 return sm 757 } 758 c.mu.RUnlock() 759 return nil 760 } 761 762 func (c *channelMap) GetServer(id int64) *ServerMetric { 763 sm := &ServerMetric{} 764 var svr *server 765 var ok bool 766 c.mu.RLock() 767 if svr, ok = c.servers[id]; !ok { 768 c.mu.RUnlock() 769 return nil 770 } 771 sm.ListenSockets = copyMap(svr.listenSockets) 772 c.mu.RUnlock() 773 sm.ID = svr.id 774 sm.RefName = svr.refName 775 sm.ServerData = svr.s.ChannelzMetric() 776 return sm 777 } 778 779 type idGenerator struct { 780 id int64 781 } 782 783 func (i *idGenerator) reset() { 784 atomic.StoreInt64(&i.id, 0) 785 } 786 787 func (i *idGenerator) genID() int64 { 788 return atomic.AddInt64(&i.id, 1) 789 }