gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

funcs.go (21739B)


      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 // Package channelz defines APIs for enabling channelz service, entry
     20 // registration/deletion, and accessing channelz data. It also defines channelz
     21 // metric struct formats.
     22 //
     23 // All APIs in this package are experimental.
     24 package channelz
     25 
     26 import (
     27 	"context"
     28 	"errors"
     29 	"fmt"
     30 	"sort"
     31 	"sync"
     32 	"sync/atomic"
     33 	"time"
     34 
     35 	"google.golang.org/grpc/grpclog"
     36 )
     37 
     38 const (
     39 	defaultMaxTraceEntry int32 = 30
     40 )
     41 
     42 var (
     43 	db    dbWrapper
     44 	idGen idGenerator
     45 	// EntryPerPage defines the number of channelz entries to be shown on a web page.
     46 	EntryPerPage  = int64(50)
     47 	curState      int32
     48 	maxTraceEntry = defaultMaxTraceEntry
     49 )
     50 
     51 // TurnOn turns on channelz data collection.
     52 func TurnOn() {
     53 	if !IsOn() {
     54 		db.set(newChannelMap())
     55 		idGen.reset()
     56 		atomic.StoreInt32(&curState, 1)
     57 	}
     58 }
     59 
     60 // IsOn returns whether channelz data collection is on.
     61 func IsOn() bool {
     62 	return atomic.CompareAndSwapInt32(&curState, 1, 1)
     63 }
     64 
     65 // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
     66 // Setting it to 0 will disable channel tracing.
     67 func SetMaxTraceEntry(i int32) {
     68 	atomic.StoreInt32(&maxTraceEntry, i)
     69 }
     70 
     71 // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
     72 func ResetMaxTraceEntryToDefault() {
     73 	atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
     74 }
     75 
     76 func getMaxTraceEntry() int {
     77 	i := atomic.LoadInt32(&maxTraceEntry)
     78 	return int(i)
     79 }
     80 
     81 // dbWarpper wraps around a reference to internal channelz data storage, and
     82 // provide synchronized functionality to set and get the reference.
     83 type dbWrapper struct {
     84 	mu sync.RWMutex
     85 	DB *channelMap
     86 }
     87 
     88 func (d *dbWrapper) set(db *channelMap) {
     89 	d.mu.Lock()
     90 	d.DB = db
     91 	d.mu.Unlock()
     92 }
     93 
     94 func (d *dbWrapper) get() *channelMap {
     95 	d.mu.RLock()
     96 	defer d.mu.RUnlock()
     97 	return d.DB
     98 }
     99 
    100 // NewChannelzStorageForTesting initializes channelz data storage and id
    101 // generator for testing purposes.
    102 //
    103 // Returns a cleanup function to be invoked by the test, which waits for up to
    104 // 10s for all channelz state to be reset by the grpc goroutines when those
    105 // entities get closed. This cleanup function helps with ensuring that tests
    106 // don't mess up each other.
    107 func NewChannelzStorageForTesting() (cleanup func() error) {
    108 	db.set(newChannelMap())
    109 	idGen.reset()
    110 
    111 	return func() error {
    112 		cm := db.get()
    113 		if cm == nil {
    114 			return nil
    115 		}
    116 
    117 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    118 		defer cancel()
    119 		ticker := time.NewTicker(10 * time.Millisecond)
    120 		defer ticker.Stop()
    121 		for {
    122 			cm.mu.RLock()
    123 			topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
    124 			cm.mu.RUnlock()
    125 
    126 			if err := ctx.Err(); err != nil {
    127 				return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
    128 			}
    129 			if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
    130 				return nil
    131 			}
    132 			<-ticker.C
    133 		}
    134 	}
    135 }
    136 
    137 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
    138 // boolean indicating whether there's more top channels to be queried for.
    139 //
    140 // The arg id specifies that only top channel with id at or above it will be included
    141 // in the result. The returned slice is up to a length of the arg maxResults or
    142 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
    143 func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
    144 	return db.get().GetTopChannels(id, maxResults)
    145 }
    146 
    147 // GetServers returns a slice of server's ServerMetric, along with a
    148 // boolean indicating whether there's more servers to be queried for.
    149 //
    150 // The arg id specifies that only server with id at or above it will be included
    151 // in the result. The returned slice is up to a length of the arg maxResults or
    152 // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
    153 func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
    154 	return db.get().GetServers(id, maxResults)
    155 }
    156 
    157 // GetServerSockets returns a slice of server's (identified by id) normal socket's
    158 // SocketMetric, along with a boolean indicating whether there's more sockets to
    159 // be queried for.
    160 //
    161 // The arg startID specifies that only sockets with id at or above it will be
    162 // included in the result. The returned slice is up to a length of the arg maxResults
    163 // or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
    164 func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
    165 	return db.get().GetServerSockets(id, startID, maxResults)
    166 }
    167 
    168 // GetChannel returns the ChannelMetric for the channel (identified by id).
    169 func GetChannel(id int64) *ChannelMetric {
    170 	return db.get().GetChannel(id)
    171 }
    172 
    173 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
    174 func GetSubChannel(id int64) *SubChannelMetric {
    175 	return db.get().GetSubChannel(id)
    176 }
    177 
    178 // GetSocket returns the SocketInternalMetric for the socket (identified by id).
    179 func GetSocket(id int64) *SocketMetric {
    180 	return db.get().GetSocket(id)
    181 }
    182 
    183 // GetServer returns the ServerMetric for the server (identified by id).
    184 func GetServer(id int64) *ServerMetric {
    185 	return db.get().GetServer(id)
    186 }
    187 
    188 // RegisterChannel registers the given channel c in the channelz database with
    189 // ref as its reference name, and adds it to the child list of its parent
    190 // (identified by pid). pid == nil means no parent.
    191 //
    192 // Returns a unique channelz identifier assigned to this channel.
    193 //
    194 // If channelz is not turned ON, the channelz database is not mutated.
    195 func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
    196 	id := idGen.genID()
    197 	var parent int64
    198 	isTopChannel := true
    199 	if pid != nil {
    200 		isTopChannel = false
    201 		parent = pid.Int()
    202 	}
    203 
    204 	if !IsOn() {
    205 		return newIdentifer(RefChannel, id, pid)
    206 	}
    207 
    208 	cn := &channel{
    209 		refName:     ref,
    210 		c:           c,
    211 		subChans:    make(map[int64]string),
    212 		nestedChans: make(map[int64]string),
    213 		id:          id,
    214 		pid:         parent,
    215 		trace:       &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
    216 	}
    217 	db.get().addChannel(id, cn, isTopChannel, parent)
    218 	return newIdentifer(RefChannel, id, pid)
    219 }
    220 
    221 // RegisterSubChannel registers the given subChannel c in the channelz database
    222 // with ref as its reference name, and adds it to the child list of its parent
    223 // (identified by pid).
    224 //
    225 // Returns a unique channelz identifier assigned to this subChannel.
    226 //
    227 // If channelz is not turned ON, the channelz database is not mutated.
    228 func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
    229 	if pid == nil {
    230 		return nil, errors.New("a SubChannel's parent id cannot be nil")
    231 	}
    232 	id := idGen.genID()
    233 	if !IsOn() {
    234 		return newIdentifer(RefSubChannel, id, pid), nil
    235 	}
    236 
    237 	sc := &subChannel{
    238 		refName: ref,
    239 		c:       c,
    240 		sockets: make(map[int64]string),
    241 		id:      id,
    242 		pid:     pid.Int(),
    243 		trace:   &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
    244 	}
    245 	db.get().addSubChannel(id, sc, pid.Int())
    246 	return newIdentifer(RefSubChannel, id, pid), nil
    247 }
    248 
    249 // RegisterServer registers the given server s in channelz database. It returns
    250 // the unique channelz tracking id assigned to this server.
    251 //
    252 // If channelz is not turned ON, the channelz database is not mutated.
    253 func RegisterServer(s Server, ref string) *Identifier {
    254 	id := idGen.genID()
    255 	if !IsOn() {
    256 		return newIdentifer(RefServer, id, nil)
    257 	}
    258 
    259 	svr := &server{
    260 		refName:       ref,
    261 		s:             s,
    262 		sockets:       make(map[int64]string),
    263 		listenSockets: make(map[int64]string),
    264 		id:            id,
    265 	}
    266 	db.get().addServer(id, svr)
    267 	return newIdentifer(RefServer, id, nil)
    268 }
    269 
    270 // RegisterListenSocket registers the given listen socket s in channelz database
    271 // with ref as its reference name, and add it to the child list of its parent
    272 // (identified by pid). It returns the unique channelz tracking id assigned to
    273 // this listen socket.
    274 //
    275 // If channelz is not turned ON, the channelz database is not mutated.
    276 func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
    277 	if pid == nil {
    278 		return nil, errors.New("a ListenSocket's parent id cannot be 0")
    279 	}
    280 	id := idGen.genID()
    281 	if !IsOn() {
    282 		return newIdentifer(RefListenSocket, id, pid), nil
    283 	}
    284 
    285 	ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
    286 	db.get().addListenSocket(id, ls, pid.Int())
    287 	return newIdentifer(RefListenSocket, id, pid), nil
    288 }
    289 
    290 // RegisterNormalSocket registers the given normal socket s in channelz database
    291 // with ref as its reference name, and adds it to the child list of its parent
    292 // (identified by pid). It returns the unique channelz tracking id assigned to
    293 // this normal socket.
    294 //
    295 // If channelz is not turned ON, the channelz database is not mutated.
    296 func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
    297 	if pid == nil {
    298 		return nil, errors.New("a NormalSocket's parent id cannot be 0")
    299 	}
    300 	id := idGen.genID()
    301 	if !IsOn() {
    302 		return newIdentifer(RefNormalSocket, id, pid), nil
    303 	}
    304 
    305 	ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
    306 	db.get().addNormalSocket(id, ns, pid.Int())
    307 	return newIdentifer(RefNormalSocket, id, pid), nil
    308 }
    309 
    310 // RemoveEntry removes an entry with unique channelz tracking id to be id from
    311 // channelz database.
    312 //
    313 // If channelz is not turned ON, this function is a no-op.
    314 func RemoveEntry(id *Identifier) {
    315 	if !IsOn() {
    316 		return
    317 	}
    318 	db.get().removeEntry(id.Int())
    319 }
    320 
    321 // TraceEventDesc is what the caller of AddTraceEvent should provide to describe
    322 // the event to be added to the channel trace.
    323 //
    324 // The Parent field is optional. It is used for an event that will be recorded
    325 // in the entity's parent trace.
    326 type TraceEventDesc struct {
    327 	Desc     string
    328 	Severity Severity
    329 	Parent   *TraceEventDesc
    330 }
    331 
    332 // AddTraceEvent adds trace related to the entity with specified id, using the
    333 // provided TraceEventDesc.
    334 //
    335 // If channelz is not turned ON, this will simply log the event descriptions.
    336 func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
    337 	// Log only the trace description associated with the bottom most entity.
    338 	switch desc.Severity {
    339 	case CtUnknown, CtInfo:
    340 		l.InfoDepth(depth+1, withParens(id)+desc.Desc)
    341 	case CtWarning:
    342 		l.WarningDepth(depth+1, withParens(id)+desc.Desc)
    343 	case CtError:
    344 		l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
    345 	}
    346 
    347 	if getMaxTraceEntry() == 0 {
    348 		return
    349 	}
    350 	if IsOn() {
    351 		db.get().traceEvent(id.Int(), desc)
    352 	}
    353 }
    354 
    355 // channelMap is the storage data structure for channelz.
    356 // Methods of channelMap can be divided in two two categories with respect to locking.
    357 // 1. Methods acquire the global lock.
    358 // 2. Methods that can only be called when global lock is held.
    359 // A second type of method need always to be called inside a first type of method.
    360 type channelMap struct {
    361 	mu               sync.RWMutex
    362 	topLevelChannels map[int64]struct{}
    363 	servers          map[int64]*server
    364 	channels         map[int64]*channel
    365 	subChannels      map[int64]*subChannel
    366 	listenSockets    map[int64]*listenSocket
    367 	normalSockets    map[int64]*normalSocket
    368 }
    369 
    370 func newChannelMap() *channelMap {
    371 	return &channelMap{
    372 		topLevelChannels: make(map[int64]struct{}),
    373 		channels:         make(map[int64]*channel),
    374 		listenSockets:    make(map[int64]*listenSocket),
    375 		normalSockets:    make(map[int64]*normalSocket),
    376 		servers:          make(map[int64]*server),
    377 		subChannels:      make(map[int64]*subChannel),
    378 	}
    379 }
    380 
    381 func (c *channelMap) addServer(id int64, s *server) {
    382 	c.mu.Lock()
    383 	s.cm = c
    384 	c.servers[id] = s
    385 	c.mu.Unlock()
    386 }
    387 
    388 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
    389 	c.mu.Lock()
    390 	cn.cm = c
    391 	cn.trace.cm = c
    392 	c.channels[id] = cn
    393 	if isTopChannel {
    394 		c.topLevelChannels[id] = struct{}{}
    395 	} else {
    396 		c.findEntry(pid).addChild(id, cn)
    397 	}
    398 	c.mu.Unlock()
    399 }
    400 
    401 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
    402 	c.mu.Lock()
    403 	sc.cm = c
    404 	sc.trace.cm = c
    405 	c.subChannels[id] = sc
    406 	c.findEntry(pid).addChild(id, sc)
    407 	c.mu.Unlock()
    408 }
    409 
    410 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
    411 	c.mu.Lock()
    412 	ls.cm = c
    413 	c.listenSockets[id] = ls
    414 	c.findEntry(pid).addChild(id, ls)
    415 	c.mu.Unlock()
    416 }
    417 
    418 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
    419 	c.mu.Lock()
    420 	ns.cm = c
    421 	c.normalSockets[id] = ns
    422 	c.findEntry(pid).addChild(id, ns)
    423 	c.mu.Unlock()
    424 }
    425 
    426 // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
    427 // wait on the deletion of its children and until no other entity's channel trace references it.
    428 // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
    429 // shutting down server will lead to the server being also deleted.
    430 func (c *channelMap) removeEntry(id int64) {
    431 	c.mu.Lock()
    432 	c.findEntry(id).triggerDelete()
    433 	c.mu.Unlock()
    434 }
    435 
    436 // c.mu must be held by the caller
    437 func (c *channelMap) decrTraceRefCount(id int64) {
    438 	e := c.findEntry(id)
    439 	if v, ok := e.(tracedChannel); ok {
    440 		v.decrTraceRefCount()
    441 		e.deleteSelfIfReady()
    442 	}
    443 }
    444 
    445 // c.mu must be held by the caller.
    446 func (c *channelMap) findEntry(id int64) entry {
    447 	var v entry
    448 	var ok bool
    449 	if v, ok = c.channels[id]; ok {
    450 		return v
    451 	}
    452 	if v, ok = c.subChannels[id]; ok {
    453 		return v
    454 	}
    455 	if v, ok = c.servers[id]; ok {
    456 		return v
    457 	}
    458 	if v, ok = c.listenSockets[id]; ok {
    459 		return v
    460 	}
    461 	if v, ok = c.normalSockets[id]; ok {
    462 		return v
    463 	}
    464 	return &dummyEntry{idNotFound: id}
    465 }
    466 
    467 // c.mu must be held by the caller
    468 // deleteEntry simply deletes an entry from the channelMap. Before calling this
    469 // method, caller must check this entry is ready to be deleted, i.e removeEntry()
    470 // has been called on it, and no children still exist.
    471 // Conditionals are ordered by the expected frequency of deletion of each entity
    472 // type, in order to optimize performance.
    473 func (c *channelMap) deleteEntry(id int64) {
    474 	var ok bool
    475 	if _, ok = c.normalSockets[id]; ok {
    476 		delete(c.normalSockets, id)
    477 		return
    478 	}
    479 	if _, ok = c.subChannels[id]; ok {
    480 		delete(c.subChannels, id)
    481 		return
    482 	}
    483 	if _, ok = c.channels[id]; ok {
    484 		delete(c.channels, id)
    485 		delete(c.topLevelChannels, id)
    486 		return
    487 	}
    488 	if _, ok = c.listenSockets[id]; ok {
    489 		delete(c.listenSockets, id)
    490 		return
    491 	}
    492 	if _, ok = c.servers[id]; ok {
    493 		delete(c.servers, id)
    494 		return
    495 	}
    496 }
    497 
    498 func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
    499 	c.mu.Lock()
    500 	child := c.findEntry(id)
    501 	childTC, ok := child.(tracedChannel)
    502 	if !ok {
    503 		c.mu.Unlock()
    504 		return
    505 	}
    506 	childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
    507 	if desc.Parent != nil {
    508 		parent := c.findEntry(child.getParentID())
    509 		var chanType RefChannelType
    510 		switch child.(type) {
    511 		case *channel:
    512 			chanType = RefChannel
    513 		case *subChannel:
    514 			chanType = RefSubChannel
    515 		}
    516 		if parentTC, ok := parent.(tracedChannel); ok {
    517 			parentTC.getChannelTrace().append(&TraceEvent{
    518 				Desc:      desc.Parent.Desc,
    519 				Severity:  desc.Parent.Severity,
    520 				Timestamp: time.Now(),
    521 				RefID:     id,
    522 				RefName:   childTC.getRefName(),
    523 				RefType:   chanType,
    524 			})
    525 			childTC.incrTraceRefCount()
    526 		}
    527 	}
    528 	c.mu.Unlock()
    529 }
    530 
    531 type int64Slice []int64
    532 
    533 func (s int64Slice) Len() int           { return len(s) }
    534 func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    535 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
    536 
    537 func copyMap(m map[int64]string) map[int64]string {
    538 	n := make(map[int64]string)
    539 	for k, v := range m {
    540 		n[k] = v
    541 	}
    542 	return n
    543 }
    544 
    545 func min(a, b int64) int64 {
    546 	if a < b {
    547 		return a
    548 	}
    549 	return b
    550 }
    551 
    552 func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
    553 	if maxResults <= 0 {
    554 		maxResults = EntryPerPage
    555 	}
    556 	c.mu.RLock()
    557 	l := int64(len(c.topLevelChannels))
    558 	ids := make([]int64, 0, l)
    559 	cns := make([]*channel, 0, min(l, maxResults))
    560 
    561 	for k := range c.topLevelChannels {
    562 		ids = append(ids, k)
    563 	}
    564 	sort.Sort(int64Slice(ids))
    565 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
    566 	count := int64(0)
    567 	var end bool
    568 	var t []*ChannelMetric
    569 	for i, v := range ids[idx:] {
    570 		if count == maxResults {
    571 			break
    572 		}
    573 		if cn, ok := c.channels[v]; ok {
    574 			cns = append(cns, cn)
    575 			t = append(t, &ChannelMetric{
    576 				NestedChans: copyMap(cn.nestedChans),
    577 				SubChans:    copyMap(cn.subChans),
    578 			})
    579 			count++
    580 		}
    581 		if i == len(ids[idx:])-1 {
    582 			end = true
    583 			break
    584 		}
    585 	}
    586 	c.mu.RUnlock()
    587 	if count == 0 {
    588 		end = true
    589 	}
    590 
    591 	for i, cn := range cns {
    592 		t[i].ChannelData = cn.c.ChannelzMetric()
    593 		t[i].ID = cn.id
    594 		t[i].RefName = cn.refName
    595 		t[i].Trace = cn.trace.dumpData()
    596 	}
    597 	return t, end
    598 }
    599 
    600 func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
    601 	if maxResults <= 0 {
    602 		maxResults = EntryPerPage
    603 	}
    604 	c.mu.RLock()
    605 	l := int64(len(c.servers))
    606 	ids := make([]int64, 0, l)
    607 	ss := make([]*server, 0, min(l, maxResults))
    608 	for k := range c.servers {
    609 		ids = append(ids, k)
    610 	}
    611 	sort.Sort(int64Slice(ids))
    612 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
    613 	count := int64(0)
    614 	var end bool
    615 	var s []*ServerMetric
    616 	for i, v := range ids[idx:] {
    617 		if count == maxResults {
    618 			break
    619 		}
    620 		if svr, ok := c.servers[v]; ok {
    621 			ss = append(ss, svr)
    622 			s = append(s, &ServerMetric{
    623 				ListenSockets: copyMap(svr.listenSockets),
    624 			})
    625 			count++
    626 		}
    627 		if i == len(ids[idx:])-1 {
    628 			end = true
    629 			break
    630 		}
    631 	}
    632 	c.mu.RUnlock()
    633 	if count == 0 {
    634 		end = true
    635 	}
    636 
    637 	for i, svr := range ss {
    638 		s[i].ServerData = svr.s.ChannelzMetric()
    639 		s[i].ID = svr.id
    640 		s[i].RefName = svr.refName
    641 	}
    642 	return s, end
    643 }
    644 
    645 func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
    646 	if maxResults <= 0 {
    647 		maxResults = EntryPerPage
    648 	}
    649 	var svr *server
    650 	var ok bool
    651 	c.mu.RLock()
    652 	if svr, ok = c.servers[id]; !ok {
    653 		// server with id doesn't exist.
    654 		c.mu.RUnlock()
    655 		return nil, true
    656 	}
    657 	svrskts := svr.sockets
    658 	l := int64(len(svrskts))
    659 	ids := make([]int64, 0, l)
    660 	sks := make([]*normalSocket, 0, min(l, maxResults))
    661 	for k := range svrskts {
    662 		ids = append(ids, k)
    663 	}
    664 	sort.Sort(int64Slice(ids))
    665 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
    666 	count := int64(0)
    667 	var end bool
    668 	for i, v := range ids[idx:] {
    669 		if count == maxResults {
    670 			break
    671 		}
    672 		if ns, ok := c.normalSockets[v]; ok {
    673 			sks = append(sks, ns)
    674 			count++
    675 		}
    676 		if i == len(ids[idx:])-1 {
    677 			end = true
    678 			break
    679 		}
    680 	}
    681 	c.mu.RUnlock()
    682 	if count == 0 {
    683 		end = true
    684 	}
    685 	s := make([]*SocketMetric, 0, len(sks))
    686 	for _, ns := range sks {
    687 		sm := &SocketMetric{}
    688 		sm.SocketData = ns.s.ChannelzMetric()
    689 		sm.ID = ns.id
    690 		sm.RefName = ns.refName
    691 		s = append(s, sm)
    692 	}
    693 	return s, end
    694 }
    695 
    696 func (c *channelMap) GetChannel(id int64) *ChannelMetric {
    697 	cm := &ChannelMetric{}
    698 	var cn *channel
    699 	var ok bool
    700 	c.mu.RLock()
    701 	if cn, ok = c.channels[id]; !ok {
    702 		// channel with id doesn't exist.
    703 		c.mu.RUnlock()
    704 		return nil
    705 	}
    706 	cm.NestedChans = copyMap(cn.nestedChans)
    707 	cm.SubChans = copyMap(cn.subChans)
    708 	// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
    709 	// holding the lock to prevent potential data race.
    710 	chanCopy := cn.c
    711 	c.mu.RUnlock()
    712 	cm.ChannelData = chanCopy.ChannelzMetric()
    713 	cm.ID = cn.id
    714 	cm.RefName = cn.refName
    715 	cm.Trace = cn.trace.dumpData()
    716 	return cm
    717 }
    718 
    719 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
    720 	cm := &SubChannelMetric{}
    721 	var sc *subChannel
    722 	var ok bool
    723 	c.mu.RLock()
    724 	if sc, ok = c.subChannels[id]; !ok {
    725 		// subchannel with id doesn't exist.
    726 		c.mu.RUnlock()
    727 		return nil
    728 	}
    729 	cm.Sockets = copyMap(sc.sockets)
    730 	// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
    731 	// holding the lock to prevent potential data race.
    732 	chanCopy := sc.c
    733 	c.mu.RUnlock()
    734 	cm.ChannelData = chanCopy.ChannelzMetric()
    735 	cm.ID = sc.id
    736 	cm.RefName = sc.refName
    737 	cm.Trace = sc.trace.dumpData()
    738 	return cm
    739 }
    740 
    741 func (c *channelMap) GetSocket(id int64) *SocketMetric {
    742 	sm := &SocketMetric{}
    743 	c.mu.RLock()
    744 	if ls, ok := c.listenSockets[id]; ok {
    745 		c.mu.RUnlock()
    746 		sm.SocketData = ls.s.ChannelzMetric()
    747 		sm.ID = ls.id
    748 		sm.RefName = ls.refName
    749 		return sm
    750 	}
    751 	if ns, ok := c.normalSockets[id]; ok {
    752 		c.mu.RUnlock()
    753 		sm.SocketData = ns.s.ChannelzMetric()
    754 		sm.ID = ns.id
    755 		sm.RefName = ns.refName
    756 		return sm
    757 	}
    758 	c.mu.RUnlock()
    759 	return nil
    760 }
    761 
    762 func (c *channelMap) GetServer(id int64) *ServerMetric {
    763 	sm := &ServerMetric{}
    764 	var svr *server
    765 	var ok bool
    766 	c.mu.RLock()
    767 	if svr, ok = c.servers[id]; !ok {
    768 		c.mu.RUnlock()
    769 		return nil
    770 	}
    771 	sm.ListenSockets = copyMap(svr.listenSockets)
    772 	c.mu.RUnlock()
    773 	sm.ID = svr.id
    774 	sm.RefName = svr.refName
    775 	sm.ServerData = svr.s.ChannelzMetric()
    776 	return sm
    777 }
    778 
    779 type idGenerator struct {
    780 	id int64
    781 }
    782 
    783 func (i *idGenerator) reset() {
    784 	atomic.StoreInt64(&i.id, 0)
    785 }
    786 
    787 func (i *idGenerator) genID() int64 {
    788 	return atomic.AddInt64(&i.id, 1)
    789 }