gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

server.go (65142B)


      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package grpc
     20 
     21 import (
     22 	"context"
     23 	"errors"
     24 	"fmt"
     25 	"io"
     26 	"math"
     27 	"net"
     28 	"net/http"
     29 	"reflect"
     30 	"runtime"
     31 	"strings"
     32 	"sync"
     33 	"sync/atomic"
     34 	"time"
     35 
     36 	"golang.org/x/net/trace"
     37 
     38 	"google.golang.org/grpc/codes"
     39 	"google.golang.org/grpc/credentials"
     40 	"google.golang.org/grpc/encoding"
     41 	"google.golang.org/grpc/encoding/proto"
     42 	"google.golang.org/grpc/grpclog"
     43 	"google.golang.org/grpc/internal"
     44 	"google.golang.org/grpc/internal/binarylog"
     45 	"google.golang.org/grpc/internal/channelz"
     46 	"google.golang.org/grpc/internal/grpcsync"
     47 	"google.golang.org/grpc/internal/grpcutil"
     48 	"google.golang.org/grpc/internal/transport"
     49 	"google.golang.org/grpc/keepalive"
     50 	"google.golang.org/grpc/metadata"
     51 	"google.golang.org/grpc/peer"
     52 	"google.golang.org/grpc/stats"
     53 	"google.golang.org/grpc/status"
     54 	"google.golang.org/grpc/tap"
     55 )
     56 
     57 const (
     58 	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
     59 	defaultServerMaxSendMessageSize    = math.MaxInt32
     60 
     61 	// Server transports are tracked in a map which is keyed on listener
     62 	// address. For regular gRPC traffic, connections are accepted in Serve()
     63 	// through a call to Accept(), and we use the actual listener address as key
     64 	// when we add it to the map. But for connections received through
     65 	// ServeHTTP(), we do not have a listener and hence use this dummy value.
     66 	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
     67 )
     68 
     69 func init() {
     70 	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
     71 		return srv.opts.creds
     72 	}
     73 	internal.DrainServerTransports = func(srv *Server, addr string) {
     74 		srv.drainServerTransports(addr)
     75 	}
     76 	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
     77 		globalServerOptions = append(globalServerOptions, opt...)
     78 	}
     79 	internal.ClearGlobalServerOptions = func() {
     80 		globalServerOptions = nil
     81 	}
     82 	internal.BinaryLogger = binaryLogger
     83 	internal.JoinServerOptions = newJoinServerOption
     84 }
     85 
     86 var statusOK = status.New(codes.OK, "")
     87 var logger = grpclog.Component("core")
     88 
     89 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
     90 
     91 // MethodDesc represents an RPC service's method specification.
     92 type MethodDesc struct {
     93 	MethodName string
     94 	Handler    methodHandler
     95 }
     96 
     97 // ServiceDesc represents an RPC service's specification.
     98 type ServiceDesc struct {
     99 	ServiceName string
    100 	// The pointer to the service interface. Used to check whether the user
    101 	// provided implementation satisfies the interface requirements.
    102 	HandlerType interface{}
    103 	Methods     []MethodDesc
    104 	Streams     []StreamDesc
    105 	Metadata    interface{}
    106 }
    107 
    108 // serviceInfo wraps information about a service. It is very similar to
    109 // ServiceDesc and is constructed from it for internal purposes.
    110 type serviceInfo struct {
    111 	// Contains the implementation for the methods in this service.
    112 	serviceImpl interface{}
    113 	methods     map[string]*MethodDesc
    114 	streams     map[string]*StreamDesc
    115 	mdata       interface{}
    116 }
    117 
    118 type serverWorkerData struct {
    119 	st     transport.ServerTransport
    120 	wg     *sync.WaitGroup
    121 	stream *transport.Stream
    122 }
    123 
    124 // Server is a gRPC server to serve RPC requests.
    125 type Server struct {
    126 	opts serverOptions
    127 
    128 	mu  sync.Mutex // guards following
    129 	lis map[net.Listener]bool
    130 	// conns contains all active server transports. It is a map keyed on a
    131 	// listener address with the value being the set of active transports
    132 	// belonging to that listener.
    133 	conns    map[string]map[transport.ServerTransport]bool
    134 	serve    bool
    135 	drain    bool
    136 	cv       *sync.Cond              // signaled when connections close for GracefulStop
    137 	services map[string]*serviceInfo // service name -> service info
    138 	events   trace.EventLog
    139 
    140 	quit               *grpcsync.Event
    141 	done               *grpcsync.Event
    142 	channelzRemoveOnce sync.Once
    143 	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
    144 
    145 	channelzID *channelz.Identifier
    146 	czData     *channelzData
    147 
    148 	serverWorkerChannel chan *serverWorkerData
    149 }
    150 
    151 type serverOptions struct {
    152 	creds                 credentials.TransportCredentials
    153 	codec                 baseCodec
    154 	cp                    Compressor
    155 	dc                    Decompressor
    156 	unaryInt              UnaryServerInterceptor
    157 	streamInt             StreamServerInterceptor
    158 	chainUnaryInts        []UnaryServerInterceptor
    159 	chainStreamInts       []StreamServerInterceptor
    160 	binaryLogger          binarylog.Logger
    161 	inTapHandle           tap.ServerInHandle
    162 	statsHandlers         []stats.Handler
    163 	maxConcurrentStreams  uint32
    164 	maxReceiveMessageSize int
    165 	maxSendMessageSize    int
    166 	unknownStreamDesc     *StreamDesc
    167 	keepaliveParams       keepalive.ServerParameters
    168 	keepalivePolicy       keepalive.EnforcementPolicy
    169 	initialWindowSize     int32
    170 	initialConnWindowSize int32
    171 	writeBufferSize       int
    172 	readBufferSize        int
    173 	connectionTimeout     time.Duration
    174 	maxHeaderListSize     *uint32
    175 	headerTableSize       *uint32
    176 	numServerWorkers      uint32
    177 }
    178 
    179 var defaultServerOptions = serverOptions{
    180 	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
    181 	maxSendMessageSize:    defaultServerMaxSendMessageSize,
    182 	connectionTimeout:     120 * time.Second,
    183 	writeBufferSize:       defaultWriteBufSize,
    184 	readBufferSize:        defaultReadBufSize,
    185 }
    186 var globalServerOptions []ServerOption
    187 
    188 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
    189 type ServerOption interface {
    190 	apply(*serverOptions)
    191 }
    192 
    193 // EmptyServerOption does not alter the server configuration. It can be embedded
    194 // in another structure to build custom server options.
    195 //
    196 // # Experimental
    197 //
    198 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
    199 // later release.
    200 type EmptyServerOption struct{}
    201 
    202 func (EmptyServerOption) apply(*serverOptions) {}
    203 
    204 // funcServerOption wraps a function that modifies serverOptions into an
    205 // implementation of the ServerOption interface.
    206 type funcServerOption struct {
    207 	f func(*serverOptions)
    208 }
    209 
    210 func (fdo *funcServerOption) apply(do *serverOptions) {
    211 	fdo.f(do)
    212 }
    213 
    214 func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
    215 	return &funcServerOption{
    216 		f: f,
    217 	}
    218 }
    219 
    220 // joinServerOption provides a way to combine arbitrary number of server
    221 // options into one.
    222 type joinServerOption struct {
    223 	opts []ServerOption
    224 }
    225 
    226 func (mdo *joinServerOption) apply(do *serverOptions) {
    227 	for _, opt := range mdo.opts {
    228 		opt.apply(do)
    229 	}
    230 }
    231 
    232 func newJoinServerOption(opts ...ServerOption) ServerOption {
    233 	return &joinServerOption{opts: opts}
    234 }
    235 
    236 // WriteBufferSize determines how much data can be batched before doing a write
    237 // on the wire. The corresponding memory allocation for this buffer will be
    238 // twice the size to keep syscalls low. The default value for this buffer is
    239 // 32KB. Zero or negative values will disable the write buffer such that each
    240 // write will be on underlying connection.
    241 // Note: A Send call may not directly translate to a write.
    242 func WriteBufferSize(s int) ServerOption {
    243 	return newFuncServerOption(func(o *serverOptions) {
    244 		o.writeBufferSize = s
    245 	})
    246 }
    247 
    248 // ReadBufferSize lets you set the size of read buffer, this determines how much
    249 // data can be read at most for one read syscall. The default value for this
    250 // buffer is 32KB. Zero or negative values will disable read buffer for a
    251 // connection so data framer can access the underlying conn directly.
    252 func ReadBufferSize(s int) ServerOption {
    253 	return newFuncServerOption(func(o *serverOptions) {
    254 		o.readBufferSize = s
    255 	})
    256 }
    257 
    258 // InitialWindowSize returns a ServerOption that sets window size for stream.
    259 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    260 func InitialWindowSize(s int32) ServerOption {
    261 	return newFuncServerOption(func(o *serverOptions) {
    262 		o.initialWindowSize = s
    263 	})
    264 }
    265 
    266 // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
    267 // The lower bound for window size is 64K and any value smaller than that will be ignored.
    268 func InitialConnWindowSize(s int32) ServerOption {
    269 	return newFuncServerOption(func(o *serverOptions) {
    270 		o.initialConnWindowSize = s
    271 	})
    272 }
    273 
    274 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
    275 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
    276 	if kp.Time > 0 && kp.Time < time.Second {
    277 		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
    278 		kp.Time = time.Second
    279 	}
    280 
    281 	return newFuncServerOption(func(o *serverOptions) {
    282 		o.keepaliveParams = kp
    283 	})
    284 }
    285 
    286 // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
    287 func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
    288 	return newFuncServerOption(func(o *serverOptions) {
    289 		o.keepalivePolicy = kep
    290 	})
    291 }
    292 
    293 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
    294 //
    295 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
    296 //
    297 // Deprecated: register codecs using encoding.RegisterCodec. The server will
    298 // automatically use registered codecs based on the incoming requests' headers.
    299 // See also
    300 // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
    301 // Will be supported throughout 1.x.
    302 func CustomCodec(codec Codec) ServerOption {
    303 	return newFuncServerOption(func(o *serverOptions) {
    304 		o.codec = codec
    305 	})
    306 }
    307 
    308 // ForceServerCodec returns a ServerOption that sets a codec for message
    309 // marshaling and unmarshaling.
    310 //
    311 // This will override any lookups by content-subtype for Codecs registered
    312 // with RegisterCodec.
    313 //
    314 // See Content-Type on
    315 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    316 // more details. Also see the documentation on RegisterCodec and
    317 // CallContentSubtype for more details on the interaction between encoding.Codec
    318 // and content-subtype.
    319 //
    320 // This function is provided for advanced users; prefer to register codecs
    321 // using encoding.RegisterCodec.
    322 // The server will automatically use registered codecs based on the incoming
    323 // requests' headers. See also
    324 // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
    325 // Will be supported throughout 1.x.
    326 //
    327 // # Experimental
    328 //
    329 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    330 // later release.
    331 func ForceServerCodec(codec encoding.Codec) ServerOption {
    332 	return newFuncServerOption(func(o *serverOptions) {
    333 		o.codec = codec
    334 	})
    335 }
    336 
    337 // RPCCompressor returns a ServerOption that sets a compressor for outbound
    338 // messages.  For backward compatibility, all outbound messages will be sent
    339 // using this compressor, regardless of incoming message compression.  By
    340 // default, server messages will be sent using the same compressor with which
    341 // request messages were sent.
    342 //
    343 // Deprecated: use encoding.RegisterCompressor instead. Will be supported
    344 // throughout 1.x.
    345 func RPCCompressor(cp Compressor) ServerOption {
    346 	return newFuncServerOption(func(o *serverOptions) {
    347 		o.cp = cp
    348 	})
    349 }
    350 
    351 // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
    352 // messages.  It has higher priority than decompressors registered via
    353 // encoding.RegisterCompressor.
    354 //
    355 // Deprecated: use encoding.RegisterCompressor instead. Will be supported
    356 // throughout 1.x.
    357 func RPCDecompressor(dc Decompressor) ServerOption {
    358 	return newFuncServerOption(func(o *serverOptions) {
    359 		o.dc = dc
    360 	})
    361 }
    362 
    363 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
    364 // If this is not set, gRPC uses the default limit.
    365 //
    366 // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
    367 func MaxMsgSize(m int) ServerOption {
    368 	return MaxRecvMsgSize(m)
    369 }
    370 
    371 // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
    372 // If this is not set, gRPC uses the default 4MB.
    373 func MaxRecvMsgSize(m int) ServerOption {
    374 	return newFuncServerOption(func(o *serverOptions) {
    375 		o.maxReceiveMessageSize = m
    376 	})
    377 }
    378 
    379 // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
    380 // If this is not set, gRPC uses the default `math.MaxInt32`.
    381 func MaxSendMsgSize(m int) ServerOption {
    382 	return newFuncServerOption(func(o *serverOptions) {
    383 		o.maxSendMessageSize = m
    384 	})
    385 }
    386 
    387 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
    388 // of concurrent streams to each ServerTransport.
    389 func MaxConcurrentStreams(n uint32) ServerOption {
    390 	return newFuncServerOption(func(o *serverOptions) {
    391 		o.maxConcurrentStreams = n
    392 	})
    393 }
    394 
    395 // Creds returns a ServerOption that sets credentials for server connections.
    396 func Creds(c credentials.TransportCredentials) ServerOption {
    397 	return newFuncServerOption(func(o *serverOptions) {
    398 		o.creds = c
    399 	})
    400 }
    401 
    402 // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
    403 // server. Only one unary interceptor can be installed. The construction of multiple
    404 // interceptors (e.g., chaining) can be implemented at the caller.
    405 func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
    406 	return newFuncServerOption(func(o *serverOptions) {
    407 		if o.unaryInt != nil {
    408 			panic("The unary server interceptor was already set and may not be reset.")
    409 		}
    410 		o.unaryInt = i
    411 	})
    412 }
    413 
    414 // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
    415 // for unary RPCs. The first interceptor will be the outer most,
    416 // while the last interceptor will be the inner most wrapper around the real call.
    417 // All unary interceptors added by this method will be chained.
    418 func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
    419 	return newFuncServerOption(func(o *serverOptions) {
    420 		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
    421 	})
    422 }
    423 
    424 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
    425 // server. Only one stream interceptor can be installed.
    426 func StreamInterceptor(i StreamServerInterceptor) ServerOption {
    427 	return newFuncServerOption(func(o *serverOptions) {
    428 		if o.streamInt != nil {
    429 			panic("The stream server interceptor was already set and may not be reset.")
    430 		}
    431 		o.streamInt = i
    432 	})
    433 }
    434 
    435 // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
    436 // for streaming RPCs. The first interceptor will be the outer most,
    437 // while the last interceptor will be the inner most wrapper around the real call.
    438 // All stream interceptors added by this method will be chained.
    439 func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
    440 	return newFuncServerOption(func(o *serverOptions) {
    441 		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
    442 	})
    443 }
    444 
    445 // InTapHandle returns a ServerOption that sets the tap handle for all the server
    446 // transport to be created. Only one can be installed.
    447 //
    448 // # Experimental
    449 //
    450 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    451 // later release.
    452 func InTapHandle(h tap.ServerInHandle) ServerOption {
    453 	return newFuncServerOption(func(o *serverOptions) {
    454 		if o.inTapHandle != nil {
    455 			panic("The tap handle was already set and may not be reset.")
    456 		}
    457 		o.inTapHandle = h
    458 	})
    459 }
    460 
    461 // StatsHandler returns a ServerOption that sets the stats handler for the server.
    462 func StatsHandler(h stats.Handler) ServerOption {
    463 	return newFuncServerOption(func(o *serverOptions) {
    464 		if h == nil {
    465 			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
    466 			// Do not allow a nil stats handler, which would otherwise cause
    467 			// panics.
    468 			return
    469 		}
    470 		o.statsHandlers = append(o.statsHandlers, h)
    471 	})
    472 }
    473 
    474 // binaryLogger returns a ServerOption that can set the binary logger for the
    475 // server.
    476 func binaryLogger(bl binarylog.Logger) ServerOption {
    477 	return newFuncServerOption(func(o *serverOptions) {
    478 		o.binaryLogger = bl
    479 	})
    480 }
    481 
    482 // UnknownServiceHandler returns a ServerOption that allows for adding a custom
    483 // unknown service handler. The provided method is a bidi-streaming RPC service
    484 // handler that will be invoked instead of returning the "unimplemented" gRPC
    485 // error whenever a request is received for an unregistered service or method.
    486 // The handling function and stream interceptor (if set) have full access to
    487 // the ServerStream, including its Context.
    488 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
    489 	return newFuncServerOption(func(o *serverOptions) {
    490 		o.unknownStreamDesc = &StreamDesc{
    491 			StreamName: "unknown_service_handler",
    492 			Handler:    streamHandler,
    493 			// We need to assume that the users of the streamHandler will want to use both.
    494 			ClientStreams: true,
    495 			ServerStreams: true,
    496 		}
    497 	})
    498 }
    499 
    500 // ConnectionTimeout returns a ServerOption that sets the timeout for
    501 // connection establishment (up to and including HTTP/2 handshaking) for all
    502 // new connections.  If this is not set, the default is 120 seconds.  A zero or
    503 // negative value will result in an immediate timeout.
    504 //
    505 // # Experimental
    506 //
    507 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    508 // later release.
    509 func ConnectionTimeout(d time.Duration) ServerOption {
    510 	return newFuncServerOption(func(o *serverOptions) {
    511 		o.connectionTimeout = d
    512 	})
    513 }
    514 
    515 // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
    516 // of header list that the server is prepared to accept.
    517 func MaxHeaderListSize(s uint32) ServerOption {
    518 	return newFuncServerOption(func(o *serverOptions) {
    519 		o.maxHeaderListSize = &s
    520 	})
    521 }
    522 
    523 // HeaderTableSize returns a ServerOption that sets the size of dynamic
    524 // header table for stream.
    525 //
    526 // # Experimental
    527 //
    528 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    529 // later release.
    530 func HeaderTableSize(s uint32) ServerOption {
    531 	return newFuncServerOption(func(o *serverOptions) {
    532 		o.headerTableSize = &s
    533 	})
    534 }
    535 
    536 // NumStreamWorkers returns a ServerOption that sets the number of worker
    537 // goroutines that should be used to process incoming streams. Setting this to
    538 // zero (default) will disable workers and spawn a new goroutine for each
    539 // stream.
    540 //
    541 // # Experimental
    542 //
    543 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
    544 // later release.
    545 func NumStreamWorkers(numServerWorkers uint32) ServerOption {
    546 	// TODO: If/when this API gets stabilized (i.e. stream workers become the
    547 	// only way streams are processed), change the behavior of the zero value to
    548 	// a sane default. Preliminary experiments suggest that a value equal to the
    549 	// number of CPUs available is most performant; requires thorough testing.
    550 	return newFuncServerOption(func(o *serverOptions) {
    551 		o.numServerWorkers = numServerWorkers
    552 	})
    553 }
    554 
    555 // serverWorkerResetThreshold defines how often the stack must be reset. Every
    556 // N requests, by spawning a new goroutine in its place, a worker can reset its
    557 // stack so that large stacks don't live in memory forever. 2^16 should allow
    558 // each goroutine stack to live for at least a few seconds in a typical
    559 // workload (assuming a QPS of a few thousand requests/sec).
    560 const serverWorkerResetThreshold = 1 << 16
    561 
    562 // serverWorkers blocks on a *transport.Stream channel forever and waits for
    563 // data to be fed by serveStreams. This allows multiple requests to be
    564 // processed by the same goroutine, removing the need for expensive stack
    565 // re-allocations (see the runtime.morestack problem [1]).
    566 //
    567 // [1] https://github.com/golang/go/issues/18138
    568 func (s *Server) serverWorker() {
    569 	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
    570 		data, ok := <-s.serverWorkerChannel
    571 		if !ok {
    572 			return
    573 		}
    574 		s.handleSingleStream(data)
    575 	}
    576 	go s.serverWorker()
    577 }
    578 
    579 func (s *Server) handleSingleStream(data *serverWorkerData) {
    580 	defer data.wg.Done()
    581 	s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
    582 }
    583 
    584 // initServerWorkers creates worker goroutines and a channel to process incoming
    585 // connections to reduce the time spent overall on runtime.morestack.
    586 func (s *Server) initServerWorkers() {
    587 	s.serverWorkerChannel = make(chan *serverWorkerData)
    588 	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
    589 		go s.serverWorker()
    590 	}
    591 }
    592 
    593 func (s *Server) stopServerWorkers() {
    594 	close(s.serverWorkerChannel)
    595 }
    596 
    597 // NewServer creates a gRPC server which has no service registered and has not
    598 // started to accept requests yet.
    599 func NewServer(opt ...ServerOption) *Server {
    600 	opts := defaultServerOptions
    601 	for _, o := range globalServerOptions {
    602 		o.apply(&opts)
    603 	}
    604 	for _, o := range opt {
    605 		o.apply(&opts)
    606 	}
    607 	s := &Server{
    608 		lis:      make(map[net.Listener]bool),
    609 		opts:     opts,
    610 		conns:    make(map[string]map[transport.ServerTransport]bool),
    611 		services: make(map[string]*serviceInfo),
    612 		quit:     grpcsync.NewEvent(),
    613 		done:     grpcsync.NewEvent(),
    614 		czData:   new(channelzData),
    615 	}
    616 	chainUnaryServerInterceptors(s)
    617 	chainStreamServerInterceptors(s)
    618 	s.cv = sync.NewCond(&s.mu)
    619 	if EnableTracing {
    620 		_, file, line, _ := runtime.Caller(1)
    621 		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
    622 	}
    623 
    624 	if s.opts.numServerWorkers > 0 {
    625 		s.initServerWorkers()
    626 	}
    627 
    628 	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
    629 	channelz.Info(logger, s.channelzID, "Server created")
    630 	return s
    631 }
    632 
    633 // printf records an event in s's event log, unless s has been stopped.
    634 // REQUIRES s.mu is held.
    635 func (s *Server) printf(format string, a ...interface{}) {
    636 	if s.events != nil {
    637 		s.events.Printf(format, a...)
    638 	}
    639 }
    640 
    641 // errorf records an error in s's event log, unless s has been stopped.
    642 // REQUIRES s.mu is held.
    643 func (s *Server) errorf(format string, a ...interface{}) {
    644 	if s.events != nil {
    645 		s.events.Errorf(format, a...)
    646 	}
    647 }
    648 
    649 // ServiceRegistrar wraps a single method that supports service registration. It
    650 // enables users to pass concrete types other than grpc.Server to the service
    651 // registration methods exported by the IDL generated code.
    652 type ServiceRegistrar interface {
    653 	// RegisterService registers a service and its implementation to the
    654 	// concrete type implementing this interface.  It may not be called
    655 	// once the server has started serving.
    656 	// desc describes the service and its methods and handlers. impl is the
    657 	// service implementation which is passed to the method handlers.
    658 	RegisterService(desc *ServiceDesc, impl interface{})
    659 }
    660 
    661 // RegisterService registers a service and its implementation to the gRPC
    662 // server. It is called from the IDL generated code. This must be called before
    663 // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
    664 // ensure it implements sd.HandlerType.
    665 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    666 	if ss != nil {
    667 		ht := reflect.TypeOf(sd.HandlerType).Elem()
    668 		st := reflect.TypeOf(ss)
    669 		if !st.Implements(ht) {
    670 			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    671 		}
    672 	}
    673 	s.register(sd, ss)
    674 }
    675 
    676 func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    677 	s.mu.Lock()
    678 	defer s.mu.Unlock()
    679 	s.printf("RegisterService(%q)", sd.ServiceName)
    680 	if s.serve {
    681 		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    682 	}
    683 	if _, ok := s.services[sd.ServiceName]; ok {
    684 		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    685 	}
    686 	info := &serviceInfo{
    687 		serviceImpl: ss,
    688 		methods:     make(map[string]*MethodDesc),
    689 		streams:     make(map[string]*StreamDesc),
    690 		mdata:       sd.Metadata,
    691 	}
    692 	for i := range sd.Methods {
    693 		d := &sd.Methods[i]
    694 		info.methods[d.MethodName] = d
    695 	}
    696 	for i := range sd.Streams {
    697 		d := &sd.Streams[i]
    698 		info.streams[d.StreamName] = d
    699 	}
    700 	s.services[sd.ServiceName] = info
    701 }
    702 
    703 // MethodInfo contains the information of an RPC including its method name and type.
    704 type MethodInfo struct {
    705 	// Name is the method name only, without the service name or package name.
    706 	Name string
    707 	// IsClientStream indicates whether the RPC is a client streaming RPC.
    708 	IsClientStream bool
    709 	// IsServerStream indicates whether the RPC is a server streaming RPC.
    710 	IsServerStream bool
    711 }
    712 
    713 // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
    714 type ServiceInfo struct {
    715 	Methods []MethodInfo
    716 	// Metadata is the metadata specified in ServiceDesc when registering service.
    717 	Metadata interface{}
    718 }
    719 
    720 // GetServiceInfo returns a map from service names to ServiceInfo.
    721 // Service names include the package names, in the form of <package>.<service>.
    722 func (s *Server) GetServiceInfo() map[string]ServiceInfo {
    723 	ret := make(map[string]ServiceInfo)
    724 	for n, srv := range s.services {
    725 		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
    726 		for m := range srv.methods {
    727 			methods = append(methods, MethodInfo{
    728 				Name:           m,
    729 				IsClientStream: false,
    730 				IsServerStream: false,
    731 			})
    732 		}
    733 		for m, d := range srv.streams {
    734 			methods = append(methods, MethodInfo{
    735 				Name:           m,
    736 				IsClientStream: d.ClientStreams,
    737 				IsServerStream: d.ServerStreams,
    738 			})
    739 		}
    740 
    741 		ret[n] = ServiceInfo{
    742 			Methods:  methods,
    743 			Metadata: srv.mdata,
    744 		}
    745 	}
    746 	return ret
    747 }
    748 
    749 // ErrServerStopped indicates that the operation is now illegal because of
    750 // the server being stopped.
    751 var ErrServerStopped = errors.New("grpc: the server has been stopped")
    752 
    753 type listenSocket struct {
    754 	net.Listener
    755 	channelzID *channelz.Identifier
    756 }
    757 
    758 func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
    759 	return &channelz.SocketInternalMetric{
    760 		SocketOptions: channelz.GetSocketOption(l.Listener),
    761 		LocalAddr:     l.Listener.Addr(),
    762 	}
    763 }
    764 
    765 func (l *listenSocket) Close() error {
    766 	err := l.Listener.Close()
    767 	channelz.RemoveEntry(l.channelzID)
    768 	channelz.Info(logger, l.channelzID, "ListenSocket deleted")
    769 	return err
    770 }
    771 
    772 // Serve accepts incoming connections on the listener lis, creating a new
    773 // ServerTransport and service goroutine for each. The service goroutines
    774 // read gRPC requests and then call the registered handlers to reply to them.
    775 // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
    776 // this method returns.
    777 // Serve will return a non-nil error unless Stop or GracefulStop is called.
    778 func (s *Server) Serve(lis net.Listener) error {
    779 	s.mu.Lock()
    780 	s.printf("serving")
    781 	s.serve = true
    782 	if s.lis == nil {
    783 		// Serve called after Stop or GracefulStop.
    784 		s.mu.Unlock()
    785 		lis.Close()
    786 		return ErrServerStopped
    787 	}
    788 
    789 	s.serveWG.Add(1)
    790 	defer func() {
    791 		s.serveWG.Done()
    792 		if s.quit.HasFired() {
    793 			// Stop or GracefulStop called; block until done and return nil.
    794 			<-s.done.Done()
    795 		}
    796 	}()
    797 
    798 	ls := &listenSocket{Listener: lis}
    799 	s.lis[ls] = true
    800 
    801 	defer func() {
    802 		s.mu.Lock()
    803 		if s.lis != nil && s.lis[ls] {
    804 			ls.Close()
    805 			delete(s.lis, ls)
    806 		}
    807 		s.mu.Unlock()
    808 	}()
    809 
    810 	var err error
    811 	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
    812 	if err != nil {
    813 		s.mu.Unlock()
    814 		return err
    815 	}
    816 	s.mu.Unlock()
    817 	channelz.Info(logger, ls.channelzID, "ListenSocket created")
    818 
    819 	var tempDelay time.Duration // how long to sleep on accept failure
    820 	for {
    821 		rawConn, err := lis.Accept()
    822 		if err != nil {
    823 			if ne, ok := err.(interface {
    824 				Temporary() bool
    825 			}); ok && ne.Temporary() {
    826 				if tempDelay == 0 {
    827 					tempDelay = 5 * time.Millisecond
    828 				} else {
    829 					tempDelay *= 2
    830 				}
    831 				if max := 1 * time.Second; tempDelay > max {
    832 					tempDelay = max
    833 				}
    834 				s.mu.Lock()
    835 				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
    836 				s.mu.Unlock()
    837 				timer := time.NewTimer(tempDelay)
    838 				select {
    839 				case <-timer.C:
    840 				case <-s.quit.Done():
    841 					timer.Stop()
    842 					return nil
    843 				}
    844 				continue
    845 			}
    846 			s.mu.Lock()
    847 			s.printf("done serving; Accept = %v", err)
    848 			s.mu.Unlock()
    849 
    850 			if s.quit.HasFired() {
    851 				return nil
    852 			}
    853 			return err
    854 		}
    855 		tempDelay = 0
    856 		// Start a new goroutine to deal with rawConn so we don't stall this Accept
    857 		// loop goroutine.
    858 		//
    859 		// Make sure we account for the goroutine so GracefulStop doesn't nil out
    860 		// s.conns before this conn can be added.
    861 		s.serveWG.Add(1)
    862 		go func() {
    863 			s.handleRawConn(lis.Addr().String(), rawConn)
    864 			s.serveWG.Done()
    865 		}()
    866 	}
    867 }
    868 
    869 // handleRawConn forks a goroutine to handle a just-accepted connection that
    870 // has not had any I/O performed on it yet.
    871 func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
    872 	if s.quit.HasFired() {
    873 		rawConn.Close()
    874 		return
    875 	}
    876 	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
    877 
    878 	// Finish handshaking (HTTP2)
    879 	st := s.newHTTP2Transport(rawConn)
    880 	rawConn.SetDeadline(time.Time{})
    881 	if st == nil {
    882 		return
    883 	}
    884 
    885 	if !s.addConn(lisAddr, st) {
    886 		return
    887 	}
    888 	go func() {
    889 		s.serveStreams(st)
    890 		s.removeConn(lisAddr, st)
    891 	}()
    892 }
    893 
    894 func (s *Server) drainServerTransports(addr string) {
    895 	s.mu.Lock()
    896 	conns := s.conns[addr]
    897 	for st := range conns {
    898 		st.Drain()
    899 	}
    900 	s.mu.Unlock()
    901 }
    902 
    903 // newHTTP2Transport sets up a http/2 transport (using the
    904 // gRPC http2 server transport in transport/http2_server.go).
    905 func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
    906 	config := &transport.ServerConfig{
    907 		MaxStreams:            s.opts.maxConcurrentStreams,
    908 		ConnectionTimeout:     s.opts.connectionTimeout,
    909 		Credentials:           s.opts.creds,
    910 		InTapHandle:           s.opts.inTapHandle,
    911 		StatsHandlers:         s.opts.statsHandlers,
    912 		KeepaliveParams:       s.opts.keepaliveParams,
    913 		KeepalivePolicy:       s.opts.keepalivePolicy,
    914 		InitialWindowSize:     s.opts.initialWindowSize,
    915 		InitialConnWindowSize: s.opts.initialConnWindowSize,
    916 		WriteBufferSize:       s.opts.writeBufferSize,
    917 		ReadBufferSize:        s.opts.readBufferSize,
    918 		ChannelzParentID:      s.channelzID,
    919 		MaxHeaderListSize:     s.opts.maxHeaderListSize,
    920 		HeaderTableSize:       s.opts.headerTableSize,
    921 	}
    922 	st, err := transport.NewServerTransport(c, config)
    923 	if err != nil {
    924 		s.mu.Lock()
    925 		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
    926 		s.mu.Unlock()
    927 		// ErrConnDispatched means that the connection was dispatched away from
    928 		// gRPC; those connections should be left open.
    929 		if err != credentials.ErrConnDispatched {
    930 			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
    931 			if err != io.EOF {
    932 				channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
    933 			}
    934 			c.Close()
    935 		}
    936 		return nil
    937 	}
    938 
    939 	return st
    940 }
    941 
    942 func (s *Server) serveStreams(st transport.ServerTransport) {
    943 	defer st.Close(errors.New("finished serving streams for the server transport"))
    944 	var wg sync.WaitGroup
    945 
    946 	st.HandleStreams(func(stream *transport.Stream) {
    947 		wg.Add(1)
    948 		if s.opts.numServerWorkers > 0 {
    949 			data := &serverWorkerData{st: st, wg: &wg, stream: stream}
    950 			select {
    951 			case s.serverWorkerChannel <- data:
    952 				return
    953 			default:
    954 				// If all stream workers are busy, fallback to the default code path.
    955 			}
    956 		}
    957 		go func() {
    958 			defer wg.Done()
    959 			s.handleStream(st, stream, s.traceInfo(st, stream))
    960 		}()
    961 	}, func(ctx context.Context, method string) context.Context {
    962 		if !EnableTracing {
    963 			return ctx
    964 		}
    965 		tr := trace.New("grpc.Recv."+methodFamily(method), method)
    966 		return trace.NewContext(ctx, tr)
    967 	})
    968 	wg.Wait()
    969 }
    970 
    971 var _ http.Handler = (*Server)(nil)
    972 
    973 // ServeHTTP implements the Go standard library's http.Handler
    974 // interface by responding to the gRPC request r, by looking up
    975 // the requested gRPC method in the gRPC server s.
    976 //
    977 // The provided HTTP request must have arrived on an HTTP/2
    978 // connection. When using the Go standard library's server,
    979 // practically this means that the Request must also have arrived
    980 // over TLS.
    981 //
    982 // To share one port (such as 443 for https) between gRPC and an
    983 // existing http.Handler, use a root http.Handler such as:
    984 //
    985 //	if r.ProtoMajor == 2 && strings.HasPrefix(
    986 //		r.Header.Get("Content-Type"), "application/grpc") {
    987 //		grpcServer.ServeHTTP(w, r)
    988 //	} else {
    989 //		yourMux.ServeHTTP(w, r)
    990 //	}
    991 //
    992 // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
    993 // separate from grpc-go's HTTP/2 server. Performance and features may vary
    994 // between the two paths. ServeHTTP does not support some gRPC features
    995 // available through grpc-go's HTTP/2 server.
    996 //
    997 // # Experimental
    998 //
    999 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   1000 // later release.
   1001 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   1002 	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
   1003 	if err != nil {
   1004 		// Errors returned from transport.NewServerHandlerTransport have
   1005 		// already been written to w.
   1006 		return
   1007 	}
   1008 	if !s.addConn(listenerAddressForServeHTTP, st) {
   1009 		return
   1010 	}
   1011 	defer s.removeConn(listenerAddressForServeHTTP, st)
   1012 	s.serveStreams(st)
   1013 }
   1014 
   1015 // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
   1016 // If tracing is not enabled, it returns nil.
   1017 func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
   1018 	if !EnableTracing {
   1019 		return nil
   1020 	}
   1021 	tr, ok := trace.FromContext(stream.Context())
   1022 	if !ok {
   1023 		return nil
   1024 	}
   1025 
   1026 	trInfo = &traceInfo{
   1027 		tr: tr,
   1028 		firstLine: firstLine{
   1029 			client:     false,
   1030 			remoteAddr: st.RemoteAddr(),
   1031 		},
   1032 	}
   1033 	if dl, ok := stream.Context().Deadline(); ok {
   1034 		trInfo.firstLine.deadline = time.Until(dl)
   1035 	}
   1036 	return trInfo
   1037 }
   1038 
   1039 func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
   1040 	s.mu.Lock()
   1041 	defer s.mu.Unlock()
   1042 	if s.conns == nil {
   1043 		st.Close(errors.New("Server.addConn called when server has already been stopped"))
   1044 		return false
   1045 	}
   1046 	if s.drain {
   1047 		// Transport added after we drained our existing conns: drain it
   1048 		// immediately.
   1049 		st.Drain()
   1050 	}
   1051 
   1052 	if s.conns[addr] == nil {
   1053 		// Create a map entry if this is the first connection on this listener.
   1054 		s.conns[addr] = make(map[transport.ServerTransport]bool)
   1055 	}
   1056 	s.conns[addr][st] = true
   1057 	return true
   1058 }
   1059 
   1060 func (s *Server) removeConn(addr string, st transport.ServerTransport) {
   1061 	s.mu.Lock()
   1062 	defer s.mu.Unlock()
   1063 
   1064 	conns := s.conns[addr]
   1065 	if conns != nil {
   1066 		delete(conns, st)
   1067 		if len(conns) == 0 {
   1068 			// If the last connection for this address is being removed, also
   1069 			// remove the map entry corresponding to the address. This is used
   1070 			// in GracefulStop() when waiting for all connections to be closed.
   1071 			delete(s.conns, addr)
   1072 		}
   1073 		s.cv.Broadcast()
   1074 	}
   1075 }
   1076 
   1077 func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
   1078 	return &channelz.ServerInternalMetric{
   1079 		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
   1080 		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
   1081 		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
   1082 		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
   1083 	}
   1084 }
   1085 
   1086 func (s *Server) incrCallsStarted() {
   1087 	atomic.AddInt64(&s.czData.callsStarted, 1)
   1088 	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
   1089 }
   1090 
   1091 func (s *Server) incrCallsSucceeded() {
   1092 	atomic.AddInt64(&s.czData.callsSucceeded, 1)
   1093 }
   1094 
   1095 func (s *Server) incrCallsFailed() {
   1096 	atomic.AddInt64(&s.czData.callsFailed, 1)
   1097 }
   1098 
   1099 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
   1100 	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
   1101 	if err != nil {
   1102 		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
   1103 		return err
   1104 	}
   1105 	compData, err := compress(data, cp, comp)
   1106 	if err != nil {
   1107 		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
   1108 		return err
   1109 	}
   1110 	hdr, payload := msgHeader(data, compData)
   1111 	// TODO(dfawley): should we be checking len(data) instead?
   1112 	if len(payload) > s.opts.maxSendMessageSize {
   1113 		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
   1114 	}
   1115 	err = t.Write(stream, hdr, payload, opts)
   1116 	if err == nil {
   1117 		for _, sh := range s.opts.statsHandlers {
   1118 			sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
   1119 		}
   1120 	}
   1121 	return err
   1122 }
   1123 
   1124 // chainUnaryServerInterceptors chains all unary server interceptors into one.
   1125 func chainUnaryServerInterceptors(s *Server) {
   1126 	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
   1127 	// be executed before any other chained interceptors.
   1128 	interceptors := s.opts.chainUnaryInts
   1129 	if s.opts.unaryInt != nil {
   1130 		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
   1131 	}
   1132 
   1133 	var chainedInt UnaryServerInterceptor
   1134 	if len(interceptors) == 0 {
   1135 		chainedInt = nil
   1136 	} else if len(interceptors) == 1 {
   1137 		chainedInt = interceptors[0]
   1138 	} else {
   1139 		chainedInt = chainUnaryInterceptors(interceptors)
   1140 	}
   1141 
   1142 	s.opts.unaryInt = chainedInt
   1143 }
   1144 
   1145 func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
   1146 	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
   1147 		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
   1148 	}
   1149 }
   1150 
   1151 func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
   1152 	if curr == len(interceptors)-1 {
   1153 		return finalHandler
   1154 	}
   1155 	return func(ctx context.Context, req interface{}) (interface{}, error) {
   1156 		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
   1157 	}
   1158 }
   1159 
   1160 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
   1161 	shs := s.opts.statsHandlers
   1162 	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
   1163 		if channelz.IsOn() {
   1164 			s.incrCallsStarted()
   1165 		}
   1166 		var statsBegin *stats.Begin
   1167 		for _, sh := range shs {
   1168 			beginTime := time.Now()
   1169 			statsBegin = &stats.Begin{
   1170 				BeginTime:      beginTime,
   1171 				IsClientStream: false,
   1172 				IsServerStream: false,
   1173 			}
   1174 			sh.HandleRPC(stream.Context(), statsBegin)
   1175 		}
   1176 		if trInfo != nil {
   1177 			trInfo.tr.LazyLog(&trInfo.firstLine, false)
   1178 		}
   1179 		// The deferred error handling for tracing, stats handler and channelz are
   1180 		// combined into one function to reduce stack usage -- a defer takes ~56-64
   1181 		// bytes on the stack, so overflowing the stack will require a stack
   1182 		// re-allocation, which is expensive.
   1183 		//
   1184 		// To maintain behavior similar to separate deferred statements, statements
   1185 		// should be executed in the reverse order. That is, tracing first, stats
   1186 		// handler second, and channelz last. Note that panics *within* defers will
   1187 		// lead to different behavior, but that's an acceptable compromise; that
   1188 		// would be undefined behavior territory anyway.
   1189 		defer func() {
   1190 			if trInfo != nil {
   1191 				if err != nil && err != io.EOF {
   1192 					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1193 					trInfo.tr.SetError()
   1194 				}
   1195 				trInfo.tr.Finish()
   1196 			}
   1197 
   1198 			for _, sh := range shs {
   1199 				end := &stats.End{
   1200 					BeginTime: statsBegin.BeginTime,
   1201 					EndTime:   time.Now(),
   1202 				}
   1203 				if err != nil && err != io.EOF {
   1204 					end.Error = toRPCErr(err)
   1205 				}
   1206 				sh.HandleRPC(stream.Context(), end)
   1207 			}
   1208 
   1209 			if channelz.IsOn() {
   1210 				if err != nil && err != io.EOF {
   1211 					s.incrCallsFailed()
   1212 				} else {
   1213 					s.incrCallsSucceeded()
   1214 				}
   1215 			}
   1216 		}()
   1217 	}
   1218 	var binlogs []binarylog.MethodLogger
   1219 	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
   1220 		binlogs = append(binlogs, ml)
   1221 	}
   1222 	if s.opts.binaryLogger != nil {
   1223 		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
   1224 			binlogs = append(binlogs, ml)
   1225 		}
   1226 	}
   1227 	if len(binlogs) != 0 {
   1228 		ctx := stream.Context()
   1229 		md, _ := metadata.FromIncomingContext(ctx)
   1230 		logEntry := &binarylog.ClientHeader{
   1231 			Header:     md,
   1232 			MethodName: stream.Method(),
   1233 			PeerAddr:   nil,
   1234 		}
   1235 		if deadline, ok := ctx.Deadline(); ok {
   1236 			logEntry.Timeout = time.Until(deadline)
   1237 			if logEntry.Timeout < 0 {
   1238 				logEntry.Timeout = 0
   1239 			}
   1240 		}
   1241 		if a := md[":authority"]; len(a) > 0 {
   1242 			logEntry.Authority = a[0]
   1243 		}
   1244 		if peer, ok := peer.FromContext(ctx); ok {
   1245 			logEntry.PeerAddr = peer.Addr
   1246 		}
   1247 		for _, binlog := range binlogs {
   1248 			binlog.Log(ctx, logEntry)
   1249 		}
   1250 	}
   1251 
   1252 	// comp and cp are used for compression.  decomp and dc are used for
   1253 	// decompression.  If comp and decomp are both set, they are the same;
   1254 	// however they are kept separate to ensure that at most one of the
   1255 	// compressor/decompressor variable pairs are set for use later.
   1256 	var comp, decomp encoding.Compressor
   1257 	var cp Compressor
   1258 	var dc Decompressor
   1259 	var sendCompressorName string
   1260 
   1261 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
   1262 	// to find a matching registered compressor for decomp.
   1263 	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
   1264 		dc = s.opts.dc
   1265 	} else if rc != "" && rc != encoding.Identity {
   1266 		decomp = encoding.GetCompressor(rc)
   1267 		if decomp == nil {
   1268 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
   1269 			t.WriteStatus(stream, st)
   1270 			return st.Err()
   1271 		}
   1272 	}
   1273 
   1274 	// If cp is set, use it.  Otherwise, attempt to compress the response using
   1275 	// the incoming message compression method.
   1276 	//
   1277 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
   1278 	if s.opts.cp != nil {
   1279 		cp = s.opts.cp
   1280 		sendCompressorName = cp.Type()
   1281 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
   1282 		// Legacy compressor not specified; attempt to respond with same encoding.
   1283 		comp = encoding.GetCompressor(rc)
   1284 		if comp != nil {
   1285 			sendCompressorName = comp.Name()
   1286 		}
   1287 	}
   1288 
   1289 	if sendCompressorName != "" {
   1290 		if err := stream.SetSendCompress(sendCompressorName); err != nil {
   1291 			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
   1292 		}
   1293 	}
   1294 
   1295 	var payInfo *payloadInfo
   1296 	if len(shs) != 0 || len(binlogs) != 0 {
   1297 		payInfo = &payloadInfo{}
   1298 	}
   1299 	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
   1300 	if err != nil {
   1301 		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
   1302 			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
   1303 		}
   1304 		return err
   1305 	}
   1306 	if channelz.IsOn() {
   1307 		t.IncrMsgRecv()
   1308 	}
   1309 	df := func(v interface{}) error {
   1310 		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
   1311 			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
   1312 		}
   1313 		for _, sh := range shs {
   1314 			sh.HandleRPC(stream.Context(), &stats.InPayload{
   1315 				RecvTime:         time.Now(),
   1316 				Payload:          v,
   1317 				Length:           len(d),
   1318 				WireLength:       payInfo.compressedLength + headerLen,
   1319 				CompressedLength: payInfo.compressedLength,
   1320 				Data:             d,
   1321 			})
   1322 		}
   1323 		if len(binlogs) != 0 {
   1324 			cm := &binarylog.ClientMessage{
   1325 				Message: d,
   1326 			}
   1327 			for _, binlog := range binlogs {
   1328 				binlog.Log(stream.Context(), cm)
   1329 			}
   1330 		}
   1331 		if trInfo != nil {
   1332 			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
   1333 		}
   1334 		return nil
   1335 	}
   1336 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
   1337 	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
   1338 	if appErr != nil {
   1339 		appStatus, ok := status.FromError(appErr)
   1340 		if !ok {
   1341 			// Convert non-status application error to a status error with code
   1342 			// Unknown, but handle context errors specifically.
   1343 			appStatus = status.FromContextError(appErr)
   1344 			appErr = appStatus.Err()
   1345 		}
   1346 		if trInfo != nil {
   1347 			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
   1348 			trInfo.tr.SetError()
   1349 		}
   1350 		if e := t.WriteStatus(stream, appStatus); e != nil {
   1351 			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
   1352 		}
   1353 		if len(binlogs) != 0 {
   1354 			if h, _ := stream.Header(); h.Len() > 0 {
   1355 				// Only log serverHeader if there was header. Otherwise it can
   1356 				// be trailer only.
   1357 				sh := &binarylog.ServerHeader{
   1358 					Header: h,
   1359 				}
   1360 				for _, binlog := range binlogs {
   1361 					binlog.Log(stream.Context(), sh)
   1362 				}
   1363 			}
   1364 			st := &binarylog.ServerTrailer{
   1365 				Trailer: stream.Trailer(),
   1366 				Err:     appErr,
   1367 			}
   1368 			for _, binlog := range binlogs {
   1369 				binlog.Log(stream.Context(), st)
   1370 			}
   1371 		}
   1372 		return appErr
   1373 	}
   1374 	if trInfo != nil {
   1375 		trInfo.tr.LazyLog(stringer("OK"), false)
   1376 	}
   1377 	opts := &transport.Options{Last: true}
   1378 
   1379 	// Server handler could have set new compressor by calling SetSendCompressor.
   1380 	// In case it is set, we need to use it for compressing outbound message.
   1381 	if stream.SendCompress() != sendCompressorName {
   1382 		comp = encoding.GetCompressor(stream.SendCompress())
   1383 	}
   1384 	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
   1385 		if err == io.EOF {
   1386 			// The entire stream is done (for unary RPC only).
   1387 			return err
   1388 		}
   1389 		if sts, ok := status.FromError(err); ok {
   1390 			if e := t.WriteStatus(stream, sts); e != nil {
   1391 				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
   1392 			}
   1393 		} else {
   1394 			switch st := err.(type) {
   1395 			case transport.ConnectionError:
   1396 				// Nothing to do here.
   1397 			default:
   1398 				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
   1399 			}
   1400 		}
   1401 		if len(binlogs) != 0 {
   1402 			h, _ := stream.Header()
   1403 			sh := &binarylog.ServerHeader{
   1404 				Header: h,
   1405 			}
   1406 			st := &binarylog.ServerTrailer{
   1407 				Trailer: stream.Trailer(),
   1408 				Err:     appErr,
   1409 			}
   1410 			for _, binlog := range binlogs {
   1411 				binlog.Log(stream.Context(), sh)
   1412 				binlog.Log(stream.Context(), st)
   1413 			}
   1414 		}
   1415 		return err
   1416 	}
   1417 	if len(binlogs) != 0 {
   1418 		h, _ := stream.Header()
   1419 		sh := &binarylog.ServerHeader{
   1420 			Header: h,
   1421 		}
   1422 		sm := &binarylog.ServerMessage{
   1423 			Message: reply,
   1424 		}
   1425 		for _, binlog := range binlogs {
   1426 			binlog.Log(stream.Context(), sh)
   1427 			binlog.Log(stream.Context(), sm)
   1428 		}
   1429 	}
   1430 	if channelz.IsOn() {
   1431 		t.IncrMsgSent()
   1432 	}
   1433 	if trInfo != nil {
   1434 		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
   1435 	}
   1436 	// TODO: Should we be logging if writing status failed here, like above?
   1437 	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
   1438 	// error or allow the stats handler to see it?
   1439 	if len(binlogs) != 0 {
   1440 		st := &binarylog.ServerTrailer{
   1441 			Trailer: stream.Trailer(),
   1442 			Err:     appErr,
   1443 		}
   1444 		for _, binlog := range binlogs {
   1445 			binlog.Log(stream.Context(), st)
   1446 		}
   1447 	}
   1448 	return t.WriteStatus(stream, statusOK)
   1449 }
   1450 
   1451 // chainStreamServerInterceptors chains all stream server interceptors into one.
   1452 func chainStreamServerInterceptors(s *Server) {
   1453 	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
   1454 	// be executed before any other chained interceptors.
   1455 	interceptors := s.opts.chainStreamInts
   1456 	if s.opts.streamInt != nil {
   1457 		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
   1458 	}
   1459 
   1460 	var chainedInt StreamServerInterceptor
   1461 	if len(interceptors) == 0 {
   1462 		chainedInt = nil
   1463 	} else if len(interceptors) == 1 {
   1464 		chainedInt = interceptors[0]
   1465 	} else {
   1466 		chainedInt = chainStreamInterceptors(interceptors)
   1467 	}
   1468 
   1469 	s.opts.streamInt = chainedInt
   1470 }
   1471 
   1472 func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
   1473 	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
   1474 		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
   1475 	}
   1476 }
   1477 
   1478 func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
   1479 	if curr == len(interceptors)-1 {
   1480 		return finalHandler
   1481 	}
   1482 	return func(srv interface{}, stream ServerStream) error {
   1483 		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
   1484 	}
   1485 }
   1486 
   1487 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
   1488 	if channelz.IsOn() {
   1489 		s.incrCallsStarted()
   1490 	}
   1491 	shs := s.opts.statsHandlers
   1492 	var statsBegin *stats.Begin
   1493 	if len(shs) != 0 {
   1494 		beginTime := time.Now()
   1495 		statsBegin = &stats.Begin{
   1496 			BeginTime:      beginTime,
   1497 			IsClientStream: sd.ClientStreams,
   1498 			IsServerStream: sd.ServerStreams,
   1499 		}
   1500 		for _, sh := range shs {
   1501 			sh.HandleRPC(stream.Context(), statsBegin)
   1502 		}
   1503 	}
   1504 	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
   1505 	ss := &serverStream{
   1506 		ctx:                   ctx,
   1507 		t:                     t,
   1508 		s:                     stream,
   1509 		p:                     &parser{r: stream},
   1510 		codec:                 s.getCodec(stream.ContentSubtype()),
   1511 		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
   1512 		maxSendMessageSize:    s.opts.maxSendMessageSize,
   1513 		trInfo:                trInfo,
   1514 		statsHandler:          shs,
   1515 	}
   1516 
   1517 	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
   1518 		// See comment in processUnaryRPC on defers.
   1519 		defer func() {
   1520 			if trInfo != nil {
   1521 				ss.mu.Lock()
   1522 				if err != nil && err != io.EOF {
   1523 					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1524 					ss.trInfo.tr.SetError()
   1525 				}
   1526 				ss.trInfo.tr.Finish()
   1527 				ss.trInfo.tr = nil
   1528 				ss.mu.Unlock()
   1529 			}
   1530 
   1531 			if len(shs) != 0 {
   1532 				end := &stats.End{
   1533 					BeginTime: statsBegin.BeginTime,
   1534 					EndTime:   time.Now(),
   1535 				}
   1536 				if err != nil && err != io.EOF {
   1537 					end.Error = toRPCErr(err)
   1538 				}
   1539 				for _, sh := range shs {
   1540 					sh.HandleRPC(stream.Context(), end)
   1541 				}
   1542 			}
   1543 
   1544 			if channelz.IsOn() {
   1545 				if err != nil && err != io.EOF {
   1546 					s.incrCallsFailed()
   1547 				} else {
   1548 					s.incrCallsSucceeded()
   1549 				}
   1550 			}
   1551 		}()
   1552 	}
   1553 
   1554 	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
   1555 		ss.binlogs = append(ss.binlogs, ml)
   1556 	}
   1557 	if s.opts.binaryLogger != nil {
   1558 		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
   1559 			ss.binlogs = append(ss.binlogs, ml)
   1560 		}
   1561 	}
   1562 	if len(ss.binlogs) != 0 {
   1563 		md, _ := metadata.FromIncomingContext(ctx)
   1564 		logEntry := &binarylog.ClientHeader{
   1565 			Header:     md,
   1566 			MethodName: stream.Method(),
   1567 			PeerAddr:   nil,
   1568 		}
   1569 		if deadline, ok := ctx.Deadline(); ok {
   1570 			logEntry.Timeout = time.Until(deadline)
   1571 			if logEntry.Timeout < 0 {
   1572 				logEntry.Timeout = 0
   1573 			}
   1574 		}
   1575 		if a := md[":authority"]; len(a) > 0 {
   1576 			logEntry.Authority = a[0]
   1577 		}
   1578 		if peer, ok := peer.FromContext(ss.Context()); ok {
   1579 			logEntry.PeerAddr = peer.Addr
   1580 		}
   1581 		for _, binlog := range ss.binlogs {
   1582 			binlog.Log(stream.Context(), logEntry)
   1583 		}
   1584 	}
   1585 
   1586 	// If dc is set and matches the stream's compression, use it.  Otherwise, try
   1587 	// to find a matching registered compressor for decomp.
   1588 	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
   1589 		ss.dc = s.opts.dc
   1590 	} else if rc != "" && rc != encoding.Identity {
   1591 		ss.decomp = encoding.GetCompressor(rc)
   1592 		if ss.decomp == nil {
   1593 			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
   1594 			t.WriteStatus(ss.s, st)
   1595 			return st.Err()
   1596 		}
   1597 	}
   1598 
   1599 	// If cp is set, use it.  Otherwise, attempt to compress the response using
   1600 	// the incoming message compression method.
   1601 	//
   1602 	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
   1603 	if s.opts.cp != nil {
   1604 		ss.cp = s.opts.cp
   1605 		ss.sendCompressorName = s.opts.cp.Type()
   1606 	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
   1607 		// Legacy compressor not specified; attempt to respond with same encoding.
   1608 		ss.comp = encoding.GetCompressor(rc)
   1609 		if ss.comp != nil {
   1610 			ss.sendCompressorName = rc
   1611 		}
   1612 	}
   1613 
   1614 	if ss.sendCompressorName != "" {
   1615 		if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
   1616 			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
   1617 		}
   1618 	}
   1619 
   1620 	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
   1621 
   1622 	if trInfo != nil {
   1623 		trInfo.tr.LazyLog(&trInfo.firstLine, false)
   1624 	}
   1625 	var appErr error
   1626 	var server interface{}
   1627 	if info != nil {
   1628 		server = info.serviceImpl
   1629 	}
   1630 	if s.opts.streamInt == nil {
   1631 		appErr = sd.Handler(server, ss)
   1632 	} else {
   1633 		info := &StreamServerInfo{
   1634 			FullMethod:     stream.Method(),
   1635 			IsClientStream: sd.ClientStreams,
   1636 			IsServerStream: sd.ServerStreams,
   1637 		}
   1638 		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
   1639 	}
   1640 	if appErr != nil {
   1641 		appStatus, ok := status.FromError(appErr)
   1642 		if !ok {
   1643 			// Convert non-status application error to a status error with code
   1644 			// Unknown, but handle context errors specifically.
   1645 			appStatus = status.FromContextError(appErr)
   1646 			appErr = appStatus.Err()
   1647 		}
   1648 		if trInfo != nil {
   1649 			ss.mu.Lock()
   1650 			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
   1651 			ss.trInfo.tr.SetError()
   1652 			ss.mu.Unlock()
   1653 		}
   1654 		if len(ss.binlogs) != 0 {
   1655 			st := &binarylog.ServerTrailer{
   1656 				Trailer: ss.s.Trailer(),
   1657 				Err:     appErr,
   1658 			}
   1659 			for _, binlog := range ss.binlogs {
   1660 				binlog.Log(stream.Context(), st)
   1661 			}
   1662 		}
   1663 		t.WriteStatus(ss.s, appStatus)
   1664 		// TODO: Should we log an error from WriteStatus here and below?
   1665 		return appErr
   1666 	}
   1667 	if trInfo != nil {
   1668 		ss.mu.Lock()
   1669 		ss.trInfo.tr.LazyLog(stringer("OK"), false)
   1670 		ss.mu.Unlock()
   1671 	}
   1672 	if len(ss.binlogs) != 0 {
   1673 		st := &binarylog.ServerTrailer{
   1674 			Trailer: ss.s.Trailer(),
   1675 			Err:     appErr,
   1676 		}
   1677 		for _, binlog := range ss.binlogs {
   1678 			binlog.Log(stream.Context(), st)
   1679 		}
   1680 	}
   1681 	return t.WriteStatus(ss.s, statusOK)
   1682 }
   1683 
   1684 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
   1685 	sm := stream.Method()
   1686 	if sm != "" && sm[0] == '/' {
   1687 		sm = sm[1:]
   1688 	}
   1689 	pos := strings.LastIndex(sm, "/")
   1690 	if pos == -1 {
   1691 		if trInfo != nil {
   1692 			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
   1693 			trInfo.tr.SetError()
   1694 		}
   1695 		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
   1696 		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
   1697 			if trInfo != nil {
   1698 				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1699 				trInfo.tr.SetError()
   1700 			}
   1701 			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
   1702 		}
   1703 		if trInfo != nil {
   1704 			trInfo.tr.Finish()
   1705 		}
   1706 		return
   1707 	}
   1708 	service := sm[:pos]
   1709 	method := sm[pos+1:]
   1710 
   1711 	srv, knownService := s.services[service]
   1712 	if knownService {
   1713 		if md, ok := srv.methods[method]; ok {
   1714 			s.processUnaryRPC(t, stream, srv, md, trInfo)
   1715 			return
   1716 		}
   1717 		if sd, ok := srv.streams[method]; ok {
   1718 			s.processStreamingRPC(t, stream, srv, sd, trInfo)
   1719 			return
   1720 		}
   1721 	}
   1722 	// Unknown service, or known server unknown method.
   1723 	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
   1724 		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
   1725 		return
   1726 	}
   1727 	var errDesc string
   1728 	if !knownService {
   1729 		errDesc = fmt.Sprintf("unknown service %v", service)
   1730 	} else {
   1731 		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
   1732 	}
   1733 	if trInfo != nil {
   1734 		trInfo.tr.LazyPrintf("%s", errDesc)
   1735 		trInfo.tr.SetError()
   1736 	}
   1737 	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
   1738 		if trInfo != nil {
   1739 			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
   1740 			trInfo.tr.SetError()
   1741 		}
   1742 		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
   1743 	}
   1744 	if trInfo != nil {
   1745 		trInfo.tr.Finish()
   1746 	}
   1747 }
   1748 
   1749 // The key to save ServerTransportStream in the context.
   1750 type streamKey struct{}
   1751 
   1752 // NewContextWithServerTransportStream creates a new context from ctx and
   1753 // attaches stream to it.
   1754 //
   1755 // # Experimental
   1756 //
   1757 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   1758 // later release.
   1759 func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
   1760 	return context.WithValue(ctx, streamKey{}, stream)
   1761 }
   1762 
   1763 // ServerTransportStream is a minimal interface that a transport stream must
   1764 // implement. This can be used to mock an actual transport stream for tests of
   1765 // handler code that use, for example, grpc.SetHeader (which requires some
   1766 // stream to be in context).
   1767 //
   1768 // See also NewContextWithServerTransportStream.
   1769 //
   1770 // # Experimental
   1771 //
   1772 // Notice: This type is EXPERIMENTAL and may be changed or removed in a
   1773 // later release.
   1774 type ServerTransportStream interface {
   1775 	Method() string
   1776 	SetHeader(md metadata.MD) error
   1777 	SendHeader(md metadata.MD) error
   1778 	SetTrailer(md metadata.MD) error
   1779 }
   1780 
   1781 // ServerTransportStreamFromContext returns the ServerTransportStream saved in
   1782 // ctx. Returns nil if the given context has no stream associated with it
   1783 // (which implies it is not an RPC invocation context).
   1784 //
   1785 // # Experimental
   1786 //
   1787 // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   1788 // later release.
   1789 func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
   1790 	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
   1791 	return s
   1792 }
   1793 
   1794 // Stop stops the gRPC server. It immediately closes all open
   1795 // connections and listeners.
   1796 // It cancels all active RPCs on the server side and the corresponding
   1797 // pending RPCs on the client side will get notified by connection
   1798 // errors.
   1799 func (s *Server) Stop() {
   1800 	s.quit.Fire()
   1801 
   1802 	defer func() {
   1803 		s.serveWG.Wait()
   1804 		s.done.Fire()
   1805 	}()
   1806 
   1807 	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
   1808 
   1809 	s.mu.Lock()
   1810 	listeners := s.lis
   1811 	s.lis = nil
   1812 	conns := s.conns
   1813 	s.conns = nil
   1814 	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
   1815 	s.cv.Broadcast()
   1816 	s.mu.Unlock()
   1817 
   1818 	for lis := range listeners {
   1819 		lis.Close()
   1820 	}
   1821 	for _, cs := range conns {
   1822 		for st := range cs {
   1823 			st.Close(errors.New("Server.Stop called"))
   1824 		}
   1825 	}
   1826 	if s.opts.numServerWorkers > 0 {
   1827 		s.stopServerWorkers()
   1828 	}
   1829 
   1830 	s.mu.Lock()
   1831 	if s.events != nil {
   1832 		s.events.Finish()
   1833 		s.events = nil
   1834 	}
   1835 	s.mu.Unlock()
   1836 }
   1837 
   1838 // GracefulStop stops the gRPC server gracefully. It stops the server from
   1839 // accepting new connections and RPCs and blocks until all the pending RPCs are
   1840 // finished.
   1841 func (s *Server) GracefulStop() {
   1842 	s.quit.Fire()
   1843 	defer s.done.Fire()
   1844 
   1845 	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
   1846 	s.mu.Lock()
   1847 	if s.conns == nil {
   1848 		s.mu.Unlock()
   1849 		return
   1850 	}
   1851 
   1852 	for lis := range s.lis {
   1853 		lis.Close()
   1854 	}
   1855 	s.lis = nil
   1856 	if !s.drain {
   1857 		for _, conns := range s.conns {
   1858 			for st := range conns {
   1859 				st.Drain()
   1860 			}
   1861 		}
   1862 		s.drain = true
   1863 	}
   1864 
   1865 	// Wait for serving threads to be ready to exit.  Only then can we be sure no
   1866 	// new conns will be created.
   1867 	s.mu.Unlock()
   1868 	s.serveWG.Wait()
   1869 	s.mu.Lock()
   1870 
   1871 	for len(s.conns) != 0 {
   1872 		s.cv.Wait()
   1873 	}
   1874 	s.conns = nil
   1875 	if s.events != nil {
   1876 		s.events.Finish()
   1877 		s.events = nil
   1878 	}
   1879 	s.mu.Unlock()
   1880 }
   1881 
   1882 // contentSubtype must be lowercase
   1883 // cannot return nil
   1884 func (s *Server) getCodec(contentSubtype string) baseCodec {
   1885 	if s.opts.codec != nil {
   1886 		return s.opts.codec
   1887 	}
   1888 	if contentSubtype == "" {
   1889 		return encoding.GetCodec(proto.Name)
   1890 	}
   1891 	codec := encoding.GetCodec(contentSubtype)
   1892 	if codec == nil {
   1893 		return encoding.GetCodec(proto.Name)
   1894 	}
   1895 	return codec
   1896 }
   1897 
   1898 // SetHeader sets the header metadata to be sent from the server to the client.
   1899 // The context provided must be the context passed to the server's handler.
   1900 //
   1901 // Streaming RPCs should prefer the SetHeader method of the ServerStream.
   1902 //
   1903 // When called multiple times, all the provided metadata will be merged.  All
   1904 // the metadata will be sent out when one of the following happens:
   1905 //
   1906 //   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
   1907 //   - The first response message is sent.  For unary handlers, this occurs when
   1908 //     the handler returns; for streaming handlers, this can happen when stream's
   1909 //     SendMsg method is called.
   1910 //   - An RPC status is sent out (error or success).  This occurs when the handler
   1911 //     returns.
   1912 //
   1913 // SetHeader will fail if called after any of the events above.
   1914 //
   1915 // The error returned is compatible with the status package.  However, the
   1916 // status code will often not match the RPC status as seen by the client
   1917 // application, and therefore, should not be relied upon for this purpose.
   1918 func SetHeader(ctx context.Context, md metadata.MD) error {
   1919 	if md.Len() == 0 {
   1920 		return nil
   1921 	}
   1922 	stream := ServerTransportStreamFromContext(ctx)
   1923 	if stream == nil {
   1924 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   1925 	}
   1926 	return stream.SetHeader(md)
   1927 }
   1928 
   1929 // SendHeader sends header metadata. It may be called at most once, and may not
   1930 // be called after any event that causes headers to be sent (see SetHeader for
   1931 // a complete list).  The provided md and headers set by SetHeader() will be
   1932 // sent.
   1933 //
   1934 // The error returned is compatible with the status package.  However, the
   1935 // status code will often not match the RPC status as seen by the client
   1936 // application, and therefore, should not be relied upon for this purpose.
   1937 func SendHeader(ctx context.Context, md metadata.MD) error {
   1938 	stream := ServerTransportStreamFromContext(ctx)
   1939 	if stream == nil {
   1940 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   1941 	}
   1942 	if err := stream.SendHeader(md); err != nil {
   1943 		return toRPCErr(err)
   1944 	}
   1945 	return nil
   1946 }
   1947 
   1948 // SetSendCompressor sets a compressor for outbound messages from the server.
   1949 // It must not be called after any event that causes headers to be sent
   1950 // (see ServerStream.SetHeader for the complete list). Provided compressor is
   1951 // used when below conditions are met:
   1952 //
   1953 //   - compressor is registered via encoding.RegisterCompressor
   1954 //   - compressor name must exist in the client advertised compressor names
   1955 //     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
   1956 //     get client supported compressor names.
   1957 //
   1958 // The context provided must be the context passed to the server's handler.
   1959 // It must be noted that compressor name encoding.Identity disables the
   1960 // outbound compression.
   1961 // By default, server messages will be sent using the same compressor with
   1962 // which request messages were sent.
   1963 //
   1964 // It is not safe to call SetSendCompressor concurrently with SendHeader and
   1965 // SendMsg.
   1966 //
   1967 // # Experimental
   1968 //
   1969 // Notice: This function is EXPERIMENTAL and may be changed or removed in a
   1970 // later release.
   1971 func SetSendCompressor(ctx context.Context, name string) error {
   1972 	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
   1973 	if !ok || stream == nil {
   1974 		return fmt.Errorf("failed to fetch the stream from the given context")
   1975 	}
   1976 
   1977 	if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
   1978 		return fmt.Errorf("unable to set send compressor: %w", err)
   1979 	}
   1980 
   1981 	return stream.SetSendCompress(name)
   1982 }
   1983 
   1984 // ClientSupportedCompressors returns compressor names advertised by the client
   1985 // via grpc-accept-encoding header.
   1986 //
   1987 // The context provided must be the context passed to the server's handler.
   1988 //
   1989 // # Experimental
   1990 //
   1991 // Notice: This function is EXPERIMENTAL and may be changed or removed in a
   1992 // later release.
   1993 func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
   1994 	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
   1995 	if !ok || stream == nil {
   1996 		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
   1997 	}
   1998 
   1999 	return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
   2000 }
   2001 
   2002 // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
   2003 // When called more than once, all the provided metadata will be merged.
   2004 //
   2005 // The error returned is compatible with the status package.  However, the
   2006 // status code will often not match the RPC status as seen by the client
   2007 // application, and therefore, should not be relied upon for this purpose.
   2008 func SetTrailer(ctx context.Context, md metadata.MD) error {
   2009 	if md.Len() == 0 {
   2010 		return nil
   2011 	}
   2012 	stream := ServerTransportStreamFromContext(ctx)
   2013 	if stream == nil {
   2014 		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
   2015 	}
   2016 	return stream.SetTrailer(md)
   2017 }
   2018 
   2019 // Method returns the method string for the server context.  The returned
   2020 // string is in the format of "/service/method".
   2021 func Method(ctx context.Context) (string, bool) {
   2022 	s := ServerTransportStreamFromContext(ctx)
   2023 	if s == nil {
   2024 		return "", false
   2025 	}
   2026 	return s.Method(), true
   2027 }
   2028 
   2029 type channelzServer struct {
   2030 	s *Server
   2031 }
   2032 
   2033 func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
   2034 	return c.s.channelzMetric()
   2035 }
   2036 
   2037 // validateSendCompressor returns an error when given compressor name cannot be
   2038 // handled by the server or the client based on the advertised compressors.
   2039 func validateSendCompressor(name, clientCompressors string) error {
   2040 	if name == encoding.Identity {
   2041 		return nil
   2042 	}
   2043 
   2044 	if !grpcutil.IsCompressorNameRegistered(name) {
   2045 		return fmt.Errorf("compressor not registered %q", name)
   2046 	}
   2047 
   2048 	for _, c := range strings.Split(clientCompressors, ",") {
   2049 		if c == name {
   2050 			return nil // found match
   2051 		}
   2052 	}
   2053 	return fmt.Errorf("client does not support compressor %q", name)
   2054 }