server.go (65142B)
1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package grpc 20 21 import ( 22 "context" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "golang.org/x/net/trace" 37 38 "google.golang.org/grpc/codes" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/encoding" 41 "google.golang.org/grpc/encoding/proto" 42 "google.golang.org/grpc/grpclog" 43 "google.golang.org/grpc/internal" 44 "google.golang.org/grpc/internal/binarylog" 45 "google.golang.org/grpc/internal/channelz" 46 "google.golang.org/grpc/internal/grpcsync" 47 "google.golang.org/grpc/internal/grpcutil" 48 "google.golang.org/grpc/internal/transport" 49 "google.golang.org/grpc/keepalive" 50 "google.golang.org/grpc/metadata" 51 "google.golang.org/grpc/peer" 52 "google.golang.org/grpc/stats" 53 "google.golang.org/grpc/status" 54 "google.golang.org/grpc/tap" 55 ) 56 57 const ( 58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 59 defaultServerMaxSendMessageSize = math.MaxInt32 60 61 // Server transports are tracked in a map which is keyed on listener 62 // address. For regular gRPC traffic, connections are accepted in Serve() 63 // through a call to Accept(), and we use the actual listener address as key 64 // when we add it to the map. But for connections received through 65 // ServeHTTP(), we do not have a listener and hence use this dummy value. 66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP" 67 ) 68 69 func init() { 70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { 71 return srv.opts.creds 72 } 73 internal.DrainServerTransports = func(srv *Server, addr string) { 74 srv.drainServerTransports(addr) 75 } 76 internal.AddGlobalServerOptions = func(opt ...ServerOption) { 77 globalServerOptions = append(globalServerOptions, opt...) 78 } 79 internal.ClearGlobalServerOptions = func() { 80 globalServerOptions = nil 81 } 82 internal.BinaryLogger = binaryLogger 83 internal.JoinServerOptions = newJoinServerOption 84 } 85 86 var statusOK = status.New(codes.OK, "") 87 var logger = grpclog.Component("core") 88 89 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 90 91 // MethodDesc represents an RPC service's method specification. 92 type MethodDesc struct { 93 MethodName string 94 Handler methodHandler 95 } 96 97 // ServiceDesc represents an RPC service's specification. 98 type ServiceDesc struct { 99 ServiceName string 100 // The pointer to the service interface. Used to check whether the user 101 // provided implementation satisfies the interface requirements. 102 HandlerType interface{} 103 Methods []MethodDesc 104 Streams []StreamDesc 105 Metadata interface{} 106 } 107 108 // serviceInfo wraps information about a service. It is very similar to 109 // ServiceDesc and is constructed from it for internal purposes. 110 type serviceInfo struct { 111 // Contains the implementation for the methods in this service. 112 serviceImpl interface{} 113 methods map[string]*MethodDesc 114 streams map[string]*StreamDesc 115 mdata interface{} 116 } 117 118 type serverWorkerData struct { 119 st transport.ServerTransport 120 wg *sync.WaitGroup 121 stream *transport.Stream 122 } 123 124 // Server is a gRPC server to serve RPC requests. 125 type Server struct { 126 opts serverOptions 127 128 mu sync.Mutex // guards following 129 lis map[net.Listener]bool 130 // conns contains all active server transports. It is a map keyed on a 131 // listener address with the value being the set of active transports 132 // belonging to that listener. 133 conns map[string]map[transport.ServerTransport]bool 134 serve bool 135 drain bool 136 cv *sync.Cond // signaled when connections close for GracefulStop 137 services map[string]*serviceInfo // service name -> service info 138 events trace.EventLog 139 140 quit *grpcsync.Event 141 done *grpcsync.Event 142 channelzRemoveOnce sync.Once 143 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 144 145 channelzID *channelz.Identifier 146 czData *channelzData 147 148 serverWorkerChannel chan *serverWorkerData 149 } 150 151 type serverOptions struct { 152 creds credentials.TransportCredentials 153 codec baseCodec 154 cp Compressor 155 dc Decompressor 156 unaryInt UnaryServerInterceptor 157 streamInt StreamServerInterceptor 158 chainUnaryInts []UnaryServerInterceptor 159 chainStreamInts []StreamServerInterceptor 160 binaryLogger binarylog.Logger 161 inTapHandle tap.ServerInHandle 162 statsHandlers []stats.Handler 163 maxConcurrentStreams uint32 164 maxReceiveMessageSize int 165 maxSendMessageSize int 166 unknownStreamDesc *StreamDesc 167 keepaliveParams keepalive.ServerParameters 168 keepalivePolicy keepalive.EnforcementPolicy 169 initialWindowSize int32 170 initialConnWindowSize int32 171 writeBufferSize int 172 readBufferSize int 173 connectionTimeout time.Duration 174 maxHeaderListSize *uint32 175 headerTableSize *uint32 176 numServerWorkers uint32 177 } 178 179 var defaultServerOptions = serverOptions{ 180 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 181 maxSendMessageSize: defaultServerMaxSendMessageSize, 182 connectionTimeout: 120 * time.Second, 183 writeBufferSize: defaultWriteBufSize, 184 readBufferSize: defaultReadBufSize, 185 } 186 var globalServerOptions []ServerOption 187 188 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 189 type ServerOption interface { 190 apply(*serverOptions) 191 } 192 193 // EmptyServerOption does not alter the server configuration. It can be embedded 194 // in another structure to build custom server options. 195 // 196 // # Experimental 197 // 198 // Notice: This type is EXPERIMENTAL and may be changed or removed in a 199 // later release. 200 type EmptyServerOption struct{} 201 202 func (EmptyServerOption) apply(*serverOptions) {} 203 204 // funcServerOption wraps a function that modifies serverOptions into an 205 // implementation of the ServerOption interface. 206 type funcServerOption struct { 207 f func(*serverOptions) 208 } 209 210 func (fdo *funcServerOption) apply(do *serverOptions) { 211 fdo.f(do) 212 } 213 214 func newFuncServerOption(f func(*serverOptions)) *funcServerOption { 215 return &funcServerOption{ 216 f: f, 217 } 218 } 219 220 // joinServerOption provides a way to combine arbitrary number of server 221 // options into one. 222 type joinServerOption struct { 223 opts []ServerOption 224 } 225 226 func (mdo *joinServerOption) apply(do *serverOptions) { 227 for _, opt := range mdo.opts { 228 opt.apply(do) 229 } 230 } 231 232 func newJoinServerOption(opts ...ServerOption) ServerOption { 233 return &joinServerOption{opts: opts} 234 } 235 236 // WriteBufferSize determines how much data can be batched before doing a write 237 // on the wire. The corresponding memory allocation for this buffer will be 238 // twice the size to keep syscalls low. The default value for this buffer is 239 // 32KB. Zero or negative values will disable the write buffer such that each 240 // write will be on underlying connection. 241 // Note: A Send call may not directly translate to a write. 242 func WriteBufferSize(s int) ServerOption { 243 return newFuncServerOption(func(o *serverOptions) { 244 o.writeBufferSize = s 245 }) 246 } 247 248 // ReadBufferSize lets you set the size of read buffer, this determines how much 249 // data can be read at most for one read syscall. The default value for this 250 // buffer is 32KB. Zero or negative values will disable read buffer for a 251 // connection so data framer can access the underlying conn directly. 252 func ReadBufferSize(s int) ServerOption { 253 return newFuncServerOption(func(o *serverOptions) { 254 o.readBufferSize = s 255 }) 256 } 257 258 // InitialWindowSize returns a ServerOption that sets window size for stream. 259 // The lower bound for window size is 64K and any value smaller than that will be ignored. 260 func InitialWindowSize(s int32) ServerOption { 261 return newFuncServerOption(func(o *serverOptions) { 262 o.initialWindowSize = s 263 }) 264 } 265 266 // InitialConnWindowSize returns a ServerOption that sets window size for a connection. 267 // The lower bound for window size is 64K and any value smaller than that will be ignored. 268 func InitialConnWindowSize(s int32) ServerOption { 269 return newFuncServerOption(func(o *serverOptions) { 270 o.initialConnWindowSize = s 271 }) 272 } 273 274 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 275 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 276 if kp.Time > 0 && kp.Time < time.Second { 277 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") 278 kp.Time = time.Second 279 } 280 281 return newFuncServerOption(func(o *serverOptions) { 282 o.keepaliveParams = kp 283 }) 284 } 285 286 // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 287 func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 288 return newFuncServerOption(func(o *serverOptions) { 289 o.keepalivePolicy = kep 290 }) 291 } 292 293 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 294 // 295 // This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 296 // 297 // Deprecated: register codecs using encoding.RegisterCodec. The server will 298 // automatically use registered codecs based on the incoming requests' headers. 299 // See also 300 // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 301 // Will be supported throughout 1.x. 302 func CustomCodec(codec Codec) ServerOption { 303 return newFuncServerOption(func(o *serverOptions) { 304 o.codec = codec 305 }) 306 } 307 308 // ForceServerCodec returns a ServerOption that sets a codec for message 309 // marshaling and unmarshaling. 310 // 311 // This will override any lookups by content-subtype for Codecs registered 312 // with RegisterCodec. 313 // 314 // See Content-Type on 315 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 316 // more details. Also see the documentation on RegisterCodec and 317 // CallContentSubtype for more details on the interaction between encoding.Codec 318 // and content-subtype. 319 // 320 // This function is provided for advanced users; prefer to register codecs 321 // using encoding.RegisterCodec. 322 // The server will automatically use registered codecs based on the incoming 323 // requests' headers. See also 324 // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. 325 // Will be supported throughout 1.x. 326 // 327 // # Experimental 328 // 329 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 330 // later release. 331 func ForceServerCodec(codec encoding.Codec) ServerOption { 332 return newFuncServerOption(func(o *serverOptions) { 333 o.codec = codec 334 }) 335 } 336 337 // RPCCompressor returns a ServerOption that sets a compressor for outbound 338 // messages. For backward compatibility, all outbound messages will be sent 339 // using this compressor, regardless of incoming message compression. By 340 // default, server messages will be sent using the same compressor with which 341 // request messages were sent. 342 // 343 // Deprecated: use encoding.RegisterCompressor instead. Will be supported 344 // throughout 1.x. 345 func RPCCompressor(cp Compressor) ServerOption { 346 return newFuncServerOption(func(o *serverOptions) { 347 o.cp = cp 348 }) 349 } 350 351 // RPCDecompressor returns a ServerOption that sets a decompressor for inbound 352 // messages. It has higher priority than decompressors registered via 353 // encoding.RegisterCompressor. 354 // 355 // Deprecated: use encoding.RegisterCompressor instead. Will be supported 356 // throughout 1.x. 357 func RPCDecompressor(dc Decompressor) ServerOption { 358 return newFuncServerOption(func(o *serverOptions) { 359 o.dc = dc 360 }) 361 } 362 363 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 364 // If this is not set, gRPC uses the default limit. 365 // 366 // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. 367 func MaxMsgSize(m int) ServerOption { 368 return MaxRecvMsgSize(m) 369 } 370 371 // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 372 // If this is not set, gRPC uses the default 4MB. 373 func MaxRecvMsgSize(m int) ServerOption { 374 return newFuncServerOption(func(o *serverOptions) { 375 o.maxReceiveMessageSize = m 376 }) 377 } 378 379 // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 380 // If this is not set, gRPC uses the default `math.MaxInt32`. 381 func MaxSendMsgSize(m int) ServerOption { 382 return newFuncServerOption(func(o *serverOptions) { 383 o.maxSendMessageSize = m 384 }) 385 } 386 387 // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 388 // of concurrent streams to each ServerTransport. 389 func MaxConcurrentStreams(n uint32) ServerOption { 390 return newFuncServerOption(func(o *serverOptions) { 391 o.maxConcurrentStreams = n 392 }) 393 } 394 395 // Creds returns a ServerOption that sets credentials for server connections. 396 func Creds(c credentials.TransportCredentials) ServerOption { 397 return newFuncServerOption(func(o *serverOptions) { 398 o.creds = c 399 }) 400 } 401 402 // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 403 // server. Only one unary interceptor can be installed. The construction of multiple 404 // interceptors (e.g., chaining) can be implemented at the caller. 405 func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 406 return newFuncServerOption(func(o *serverOptions) { 407 if o.unaryInt != nil { 408 panic("The unary server interceptor was already set and may not be reset.") 409 } 410 o.unaryInt = i 411 }) 412 } 413 414 // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor 415 // for unary RPCs. The first interceptor will be the outer most, 416 // while the last interceptor will be the inner most wrapper around the real call. 417 // All unary interceptors added by this method will be chained. 418 func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { 419 return newFuncServerOption(func(o *serverOptions) { 420 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...) 421 }) 422 } 423 424 // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 425 // server. Only one stream interceptor can be installed. 426 func StreamInterceptor(i StreamServerInterceptor) ServerOption { 427 return newFuncServerOption(func(o *serverOptions) { 428 if o.streamInt != nil { 429 panic("The stream server interceptor was already set and may not be reset.") 430 } 431 o.streamInt = i 432 }) 433 } 434 435 // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor 436 // for streaming RPCs. The first interceptor will be the outer most, 437 // while the last interceptor will be the inner most wrapper around the real call. 438 // All stream interceptors added by this method will be chained. 439 func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { 440 return newFuncServerOption(func(o *serverOptions) { 441 o.chainStreamInts = append(o.chainStreamInts, interceptors...) 442 }) 443 } 444 445 // InTapHandle returns a ServerOption that sets the tap handle for all the server 446 // transport to be created. Only one can be installed. 447 // 448 // # Experimental 449 // 450 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 451 // later release. 452 func InTapHandle(h tap.ServerInHandle) ServerOption { 453 return newFuncServerOption(func(o *serverOptions) { 454 if o.inTapHandle != nil { 455 panic("The tap handle was already set and may not be reset.") 456 } 457 o.inTapHandle = h 458 }) 459 } 460 461 // StatsHandler returns a ServerOption that sets the stats handler for the server. 462 func StatsHandler(h stats.Handler) ServerOption { 463 return newFuncServerOption(func(o *serverOptions) { 464 if h == nil { 465 logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption") 466 // Do not allow a nil stats handler, which would otherwise cause 467 // panics. 468 return 469 } 470 o.statsHandlers = append(o.statsHandlers, h) 471 }) 472 } 473 474 // binaryLogger returns a ServerOption that can set the binary logger for the 475 // server. 476 func binaryLogger(bl binarylog.Logger) ServerOption { 477 return newFuncServerOption(func(o *serverOptions) { 478 o.binaryLogger = bl 479 }) 480 } 481 482 // UnknownServiceHandler returns a ServerOption that allows for adding a custom 483 // unknown service handler. The provided method is a bidi-streaming RPC service 484 // handler that will be invoked instead of returning the "unimplemented" gRPC 485 // error whenever a request is received for an unregistered service or method. 486 // The handling function and stream interceptor (if set) have full access to 487 // the ServerStream, including its Context. 488 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 489 return newFuncServerOption(func(o *serverOptions) { 490 o.unknownStreamDesc = &StreamDesc{ 491 StreamName: "unknown_service_handler", 492 Handler: streamHandler, 493 // We need to assume that the users of the streamHandler will want to use both. 494 ClientStreams: true, 495 ServerStreams: true, 496 } 497 }) 498 } 499 500 // ConnectionTimeout returns a ServerOption that sets the timeout for 501 // connection establishment (up to and including HTTP/2 handshaking) for all 502 // new connections. If this is not set, the default is 120 seconds. A zero or 503 // negative value will result in an immediate timeout. 504 // 505 // # Experimental 506 // 507 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 508 // later release. 509 func ConnectionTimeout(d time.Duration) ServerOption { 510 return newFuncServerOption(func(o *serverOptions) { 511 o.connectionTimeout = d 512 }) 513 } 514 515 // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size 516 // of header list that the server is prepared to accept. 517 func MaxHeaderListSize(s uint32) ServerOption { 518 return newFuncServerOption(func(o *serverOptions) { 519 o.maxHeaderListSize = &s 520 }) 521 } 522 523 // HeaderTableSize returns a ServerOption that sets the size of dynamic 524 // header table for stream. 525 // 526 // # Experimental 527 // 528 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 529 // later release. 530 func HeaderTableSize(s uint32) ServerOption { 531 return newFuncServerOption(func(o *serverOptions) { 532 o.headerTableSize = &s 533 }) 534 } 535 536 // NumStreamWorkers returns a ServerOption that sets the number of worker 537 // goroutines that should be used to process incoming streams. Setting this to 538 // zero (default) will disable workers and spawn a new goroutine for each 539 // stream. 540 // 541 // # Experimental 542 // 543 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 544 // later release. 545 func NumStreamWorkers(numServerWorkers uint32) ServerOption { 546 // TODO: If/when this API gets stabilized (i.e. stream workers become the 547 // only way streams are processed), change the behavior of the zero value to 548 // a sane default. Preliminary experiments suggest that a value equal to the 549 // number of CPUs available is most performant; requires thorough testing. 550 return newFuncServerOption(func(o *serverOptions) { 551 o.numServerWorkers = numServerWorkers 552 }) 553 } 554 555 // serverWorkerResetThreshold defines how often the stack must be reset. Every 556 // N requests, by spawning a new goroutine in its place, a worker can reset its 557 // stack so that large stacks don't live in memory forever. 2^16 should allow 558 // each goroutine stack to live for at least a few seconds in a typical 559 // workload (assuming a QPS of a few thousand requests/sec). 560 const serverWorkerResetThreshold = 1 << 16 561 562 // serverWorkers blocks on a *transport.Stream channel forever and waits for 563 // data to be fed by serveStreams. This allows multiple requests to be 564 // processed by the same goroutine, removing the need for expensive stack 565 // re-allocations (see the runtime.morestack problem [1]). 566 // 567 // [1] https://github.com/golang/go/issues/18138 568 func (s *Server) serverWorker() { 569 for completed := 0; completed < serverWorkerResetThreshold; completed++ { 570 data, ok := <-s.serverWorkerChannel 571 if !ok { 572 return 573 } 574 s.handleSingleStream(data) 575 } 576 go s.serverWorker() 577 } 578 579 func (s *Server) handleSingleStream(data *serverWorkerData) { 580 defer data.wg.Done() 581 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) 582 } 583 584 // initServerWorkers creates worker goroutines and a channel to process incoming 585 // connections to reduce the time spent overall on runtime.morestack. 586 func (s *Server) initServerWorkers() { 587 s.serverWorkerChannel = make(chan *serverWorkerData) 588 for i := uint32(0); i < s.opts.numServerWorkers; i++ { 589 go s.serverWorker() 590 } 591 } 592 593 func (s *Server) stopServerWorkers() { 594 close(s.serverWorkerChannel) 595 } 596 597 // NewServer creates a gRPC server which has no service registered and has not 598 // started to accept requests yet. 599 func NewServer(opt ...ServerOption) *Server { 600 opts := defaultServerOptions 601 for _, o := range globalServerOptions { 602 o.apply(&opts) 603 } 604 for _, o := range opt { 605 o.apply(&opts) 606 } 607 s := &Server{ 608 lis: make(map[net.Listener]bool), 609 opts: opts, 610 conns: make(map[string]map[transport.ServerTransport]bool), 611 services: make(map[string]*serviceInfo), 612 quit: grpcsync.NewEvent(), 613 done: grpcsync.NewEvent(), 614 czData: new(channelzData), 615 } 616 chainUnaryServerInterceptors(s) 617 chainStreamServerInterceptors(s) 618 s.cv = sync.NewCond(&s.mu) 619 if EnableTracing { 620 _, file, line, _ := runtime.Caller(1) 621 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 622 } 623 624 if s.opts.numServerWorkers > 0 { 625 s.initServerWorkers() 626 } 627 628 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") 629 channelz.Info(logger, s.channelzID, "Server created") 630 return s 631 } 632 633 // printf records an event in s's event log, unless s has been stopped. 634 // REQUIRES s.mu is held. 635 func (s *Server) printf(format string, a ...interface{}) { 636 if s.events != nil { 637 s.events.Printf(format, a...) 638 } 639 } 640 641 // errorf records an error in s's event log, unless s has been stopped. 642 // REQUIRES s.mu is held. 643 func (s *Server) errorf(format string, a ...interface{}) { 644 if s.events != nil { 645 s.events.Errorf(format, a...) 646 } 647 } 648 649 // ServiceRegistrar wraps a single method that supports service registration. It 650 // enables users to pass concrete types other than grpc.Server to the service 651 // registration methods exported by the IDL generated code. 652 type ServiceRegistrar interface { 653 // RegisterService registers a service and its implementation to the 654 // concrete type implementing this interface. It may not be called 655 // once the server has started serving. 656 // desc describes the service and its methods and handlers. impl is the 657 // service implementation which is passed to the method handlers. 658 RegisterService(desc *ServiceDesc, impl interface{}) 659 } 660 661 // RegisterService registers a service and its implementation to the gRPC 662 // server. It is called from the IDL generated code. This must be called before 663 // invoking Serve. If ss is non-nil (for legacy code), its type is checked to 664 // ensure it implements sd.HandlerType. 665 func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 666 if ss != nil { 667 ht := reflect.TypeOf(sd.HandlerType).Elem() 668 st := reflect.TypeOf(ss) 669 if !st.Implements(ht) { 670 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 671 } 672 } 673 s.register(sd, ss) 674 } 675 676 func (s *Server) register(sd *ServiceDesc, ss interface{}) { 677 s.mu.Lock() 678 defer s.mu.Unlock() 679 s.printf("RegisterService(%q)", sd.ServiceName) 680 if s.serve { 681 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 682 } 683 if _, ok := s.services[sd.ServiceName]; ok { 684 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 685 } 686 info := &serviceInfo{ 687 serviceImpl: ss, 688 methods: make(map[string]*MethodDesc), 689 streams: make(map[string]*StreamDesc), 690 mdata: sd.Metadata, 691 } 692 for i := range sd.Methods { 693 d := &sd.Methods[i] 694 info.methods[d.MethodName] = d 695 } 696 for i := range sd.Streams { 697 d := &sd.Streams[i] 698 info.streams[d.StreamName] = d 699 } 700 s.services[sd.ServiceName] = info 701 } 702 703 // MethodInfo contains the information of an RPC including its method name and type. 704 type MethodInfo struct { 705 // Name is the method name only, without the service name or package name. 706 Name string 707 // IsClientStream indicates whether the RPC is a client streaming RPC. 708 IsClientStream bool 709 // IsServerStream indicates whether the RPC is a server streaming RPC. 710 IsServerStream bool 711 } 712 713 // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 714 type ServiceInfo struct { 715 Methods []MethodInfo 716 // Metadata is the metadata specified in ServiceDesc when registering service. 717 Metadata interface{} 718 } 719 720 // GetServiceInfo returns a map from service names to ServiceInfo. 721 // Service names include the package names, in the form of <package>.<service>. 722 func (s *Server) GetServiceInfo() map[string]ServiceInfo { 723 ret := make(map[string]ServiceInfo) 724 for n, srv := range s.services { 725 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams)) 726 for m := range srv.methods { 727 methods = append(methods, MethodInfo{ 728 Name: m, 729 IsClientStream: false, 730 IsServerStream: false, 731 }) 732 } 733 for m, d := range srv.streams { 734 methods = append(methods, MethodInfo{ 735 Name: m, 736 IsClientStream: d.ClientStreams, 737 IsServerStream: d.ServerStreams, 738 }) 739 } 740 741 ret[n] = ServiceInfo{ 742 Methods: methods, 743 Metadata: srv.mdata, 744 } 745 } 746 return ret 747 } 748 749 // ErrServerStopped indicates that the operation is now illegal because of 750 // the server being stopped. 751 var ErrServerStopped = errors.New("grpc: the server has been stopped") 752 753 type listenSocket struct { 754 net.Listener 755 channelzID *channelz.Identifier 756 } 757 758 func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 759 return &channelz.SocketInternalMetric{ 760 SocketOptions: channelz.GetSocketOption(l.Listener), 761 LocalAddr: l.Listener.Addr(), 762 } 763 } 764 765 func (l *listenSocket) Close() error { 766 err := l.Listener.Close() 767 channelz.RemoveEntry(l.channelzID) 768 channelz.Info(logger, l.channelzID, "ListenSocket deleted") 769 return err 770 } 771 772 // Serve accepts incoming connections on the listener lis, creating a new 773 // ServerTransport and service goroutine for each. The service goroutines 774 // read gRPC requests and then call the registered handlers to reply to them. 775 // Serve returns when lis.Accept fails with fatal errors. lis will be closed when 776 // this method returns. 777 // Serve will return a non-nil error unless Stop or GracefulStop is called. 778 func (s *Server) Serve(lis net.Listener) error { 779 s.mu.Lock() 780 s.printf("serving") 781 s.serve = true 782 if s.lis == nil { 783 // Serve called after Stop or GracefulStop. 784 s.mu.Unlock() 785 lis.Close() 786 return ErrServerStopped 787 } 788 789 s.serveWG.Add(1) 790 defer func() { 791 s.serveWG.Done() 792 if s.quit.HasFired() { 793 // Stop or GracefulStop called; block until done and return nil. 794 <-s.done.Done() 795 } 796 }() 797 798 ls := &listenSocket{Listener: lis} 799 s.lis[ls] = true 800 801 defer func() { 802 s.mu.Lock() 803 if s.lis != nil && s.lis[ls] { 804 ls.Close() 805 delete(s.lis, ls) 806 } 807 s.mu.Unlock() 808 }() 809 810 var err error 811 ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) 812 if err != nil { 813 s.mu.Unlock() 814 return err 815 } 816 s.mu.Unlock() 817 channelz.Info(logger, ls.channelzID, "ListenSocket created") 818 819 var tempDelay time.Duration // how long to sleep on accept failure 820 for { 821 rawConn, err := lis.Accept() 822 if err != nil { 823 if ne, ok := err.(interface { 824 Temporary() bool 825 }); ok && ne.Temporary() { 826 if tempDelay == 0 { 827 tempDelay = 5 * time.Millisecond 828 } else { 829 tempDelay *= 2 830 } 831 if max := 1 * time.Second; tempDelay > max { 832 tempDelay = max 833 } 834 s.mu.Lock() 835 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 836 s.mu.Unlock() 837 timer := time.NewTimer(tempDelay) 838 select { 839 case <-timer.C: 840 case <-s.quit.Done(): 841 timer.Stop() 842 return nil 843 } 844 continue 845 } 846 s.mu.Lock() 847 s.printf("done serving; Accept = %v", err) 848 s.mu.Unlock() 849 850 if s.quit.HasFired() { 851 return nil 852 } 853 return err 854 } 855 tempDelay = 0 856 // Start a new goroutine to deal with rawConn so we don't stall this Accept 857 // loop goroutine. 858 // 859 // Make sure we account for the goroutine so GracefulStop doesn't nil out 860 // s.conns before this conn can be added. 861 s.serveWG.Add(1) 862 go func() { 863 s.handleRawConn(lis.Addr().String(), rawConn) 864 s.serveWG.Done() 865 }() 866 } 867 } 868 869 // handleRawConn forks a goroutine to handle a just-accepted connection that 870 // has not had any I/O performed on it yet. 871 func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { 872 if s.quit.HasFired() { 873 rawConn.Close() 874 return 875 } 876 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 877 878 // Finish handshaking (HTTP2) 879 st := s.newHTTP2Transport(rawConn) 880 rawConn.SetDeadline(time.Time{}) 881 if st == nil { 882 return 883 } 884 885 if !s.addConn(lisAddr, st) { 886 return 887 } 888 go func() { 889 s.serveStreams(st) 890 s.removeConn(lisAddr, st) 891 }() 892 } 893 894 func (s *Server) drainServerTransports(addr string) { 895 s.mu.Lock() 896 conns := s.conns[addr] 897 for st := range conns { 898 st.Drain() 899 } 900 s.mu.Unlock() 901 } 902 903 // newHTTP2Transport sets up a http/2 transport (using the 904 // gRPC http2 server transport in transport/http2_server.go). 905 func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { 906 config := &transport.ServerConfig{ 907 MaxStreams: s.opts.maxConcurrentStreams, 908 ConnectionTimeout: s.opts.connectionTimeout, 909 Credentials: s.opts.creds, 910 InTapHandle: s.opts.inTapHandle, 911 StatsHandlers: s.opts.statsHandlers, 912 KeepaliveParams: s.opts.keepaliveParams, 913 KeepalivePolicy: s.opts.keepalivePolicy, 914 InitialWindowSize: s.opts.initialWindowSize, 915 InitialConnWindowSize: s.opts.initialConnWindowSize, 916 WriteBufferSize: s.opts.writeBufferSize, 917 ReadBufferSize: s.opts.readBufferSize, 918 ChannelzParentID: s.channelzID, 919 MaxHeaderListSize: s.opts.maxHeaderListSize, 920 HeaderTableSize: s.opts.headerTableSize, 921 } 922 st, err := transport.NewServerTransport(c, config) 923 if err != nil { 924 s.mu.Lock() 925 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 926 s.mu.Unlock() 927 // ErrConnDispatched means that the connection was dispatched away from 928 // gRPC; those connections should be left open. 929 if err != credentials.ErrConnDispatched { 930 // Don't log on ErrConnDispatched and io.EOF to prevent log spam. 931 if err != io.EOF { 932 channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) 933 } 934 c.Close() 935 } 936 return nil 937 } 938 939 return st 940 } 941 942 func (s *Server) serveStreams(st transport.ServerTransport) { 943 defer st.Close(errors.New("finished serving streams for the server transport")) 944 var wg sync.WaitGroup 945 946 st.HandleStreams(func(stream *transport.Stream) { 947 wg.Add(1) 948 if s.opts.numServerWorkers > 0 { 949 data := &serverWorkerData{st: st, wg: &wg, stream: stream} 950 select { 951 case s.serverWorkerChannel <- data: 952 return 953 default: 954 // If all stream workers are busy, fallback to the default code path. 955 } 956 } 957 go func() { 958 defer wg.Done() 959 s.handleStream(st, stream, s.traceInfo(st, stream)) 960 }() 961 }, func(ctx context.Context, method string) context.Context { 962 if !EnableTracing { 963 return ctx 964 } 965 tr := trace.New("grpc.Recv."+methodFamily(method), method) 966 return trace.NewContext(ctx, tr) 967 }) 968 wg.Wait() 969 } 970 971 var _ http.Handler = (*Server)(nil) 972 973 // ServeHTTP implements the Go standard library's http.Handler 974 // interface by responding to the gRPC request r, by looking up 975 // the requested gRPC method in the gRPC server s. 976 // 977 // The provided HTTP request must have arrived on an HTTP/2 978 // connection. When using the Go standard library's server, 979 // practically this means that the Request must also have arrived 980 // over TLS. 981 // 982 // To share one port (such as 443 for https) between gRPC and an 983 // existing http.Handler, use a root http.Handler such as: 984 // 985 // if r.ProtoMajor == 2 && strings.HasPrefix( 986 // r.Header.Get("Content-Type"), "application/grpc") { 987 // grpcServer.ServeHTTP(w, r) 988 // } else { 989 // yourMux.ServeHTTP(w, r) 990 // } 991 // 992 // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 993 // separate from grpc-go's HTTP/2 server. Performance and features may vary 994 // between the two paths. ServeHTTP does not support some gRPC features 995 // available through grpc-go's HTTP/2 server. 996 // 997 // # Experimental 998 // 999 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 1000 // later release. 1001 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 1002 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers) 1003 if err != nil { 1004 // Errors returned from transport.NewServerHandlerTransport have 1005 // already been written to w. 1006 return 1007 } 1008 if !s.addConn(listenerAddressForServeHTTP, st) { 1009 return 1010 } 1011 defer s.removeConn(listenerAddressForServeHTTP, st) 1012 s.serveStreams(st) 1013 } 1014 1015 // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 1016 // If tracing is not enabled, it returns nil. 1017 func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 1018 if !EnableTracing { 1019 return nil 1020 } 1021 tr, ok := trace.FromContext(stream.Context()) 1022 if !ok { 1023 return nil 1024 } 1025 1026 trInfo = &traceInfo{ 1027 tr: tr, 1028 firstLine: firstLine{ 1029 client: false, 1030 remoteAddr: st.RemoteAddr(), 1031 }, 1032 } 1033 if dl, ok := stream.Context().Deadline(); ok { 1034 trInfo.firstLine.deadline = time.Until(dl) 1035 } 1036 return trInfo 1037 } 1038 1039 func (s *Server) addConn(addr string, st transport.ServerTransport) bool { 1040 s.mu.Lock() 1041 defer s.mu.Unlock() 1042 if s.conns == nil { 1043 st.Close(errors.New("Server.addConn called when server has already been stopped")) 1044 return false 1045 } 1046 if s.drain { 1047 // Transport added after we drained our existing conns: drain it 1048 // immediately. 1049 st.Drain() 1050 } 1051 1052 if s.conns[addr] == nil { 1053 // Create a map entry if this is the first connection on this listener. 1054 s.conns[addr] = make(map[transport.ServerTransport]bool) 1055 } 1056 s.conns[addr][st] = true 1057 return true 1058 } 1059 1060 func (s *Server) removeConn(addr string, st transport.ServerTransport) { 1061 s.mu.Lock() 1062 defer s.mu.Unlock() 1063 1064 conns := s.conns[addr] 1065 if conns != nil { 1066 delete(conns, st) 1067 if len(conns) == 0 { 1068 // If the last connection for this address is being removed, also 1069 // remove the map entry corresponding to the address. This is used 1070 // in GracefulStop() when waiting for all connections to be closed. 1071 delete(s.conns, addr) 1072 } 1073 s.cv.Broadcast() 1074 } 1075 } 1076 1077 func (s *Server) channelzMetric() *channelz.ServerInternalMetric { 1078 return &channelz.ServerInternalMetric{ 1079 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), 1080 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), 1081 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), 1082 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), 1083 } 1084 } 1085 1086 func (s *Server) incrCallsStarted() { 1087 atomic.AddInt64(&s.czData.callsStarted, 1) 1088 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) 1089 } 1090 1091 func (s *Server) incrCallsSucceeded() { 1092 atomic.AddInt64(&s.czData.callsSucceeded, 1) 1093 } 1094 1095 func (s *Server) incrCallsFailed() { 1096 atomic.AddInt64(&s.czData.callsFailed, 1) 1097 } 1098 1099 func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 1100 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 1101 if err != nil { 1102 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) 1103 return err 1104 } 1105 compData, err := compress(data, cp, comp) 1106 if err != nil { 1107 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) 1108 return err 1109 } 1110 hdr, payload := msgHeader(data, compData) 1111 // TODO(dfawley): should we be checking len(data) instead? 1112 if len(payload) > s.opts.maxSendMessageSize { 1113 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 1114 } 1115 err = t.Write(stream, hdr, payload, opts) 1116 if err == nil { 1117 for _, sh := range s.opts.statsHandlers { 1118 sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 1119 } 1120 } 1121 return err 1122 } 1123 1124 // chainUnaryServerInterceptors chains all unary server interceptors into one. 1125 func chainUnaryServerInterceptors(s *Server) { 1126 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will 1127 // be executed before any other chained interceptors. 1128 interceptors := s.opts.chainUnaryInts 1129 if s.opts.unaryInt != nil { 1130 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...) 1131 } 1132 1133 var chainedInt UnaryServerInterceptor 1134 if len(interceptors) == 0 { 1135 chainedInt = nil 1136 } else if len(interceptors) == 1 { 1137 chainedInt = interceptors[0] 1138 } else { 1139 chainedInt = chainUnaryInterceptors(interceptors) 1140 } 1141 1142 s.opts.unaryInt = chainedInt 1143 } 1144 1145 func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { 1146 return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { 1147 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler)) 1148 } 1149 } 1150 1151 func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { 1152 if curr == len(interceptors)-1 { 1153 return finalHandler 1154 } 1155 return func(ctx context.Context, req interface{}) (interface{}, error) { 1156 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler)) 1157 } 1158 } 1159 1160 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { 1161 shs := s.opts.statsHandlers 1162 if len(shs) != 0 || trInfo != nil || channelz.IsOn() { 1163 if channelz.IsOn() { 1164 s.incrCallsStarted() 1165 } 1166 var statsBegin *stats.Begin 1167 for _, sh := range shs { 1168 beginTime := time.Now() 1169 statsBegin = &stats.Begin{ 1170 BeginTime: beginTime, 1171 IsClientStream: false, 1172 IsServerStream: false, 1173 } 1174 sh.HandleRPC(stream.Context(), statsBegin) 1175 } 1176 if trInfo != nil { 1177 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1178 } 1179 // The deferred error handling for tracing, stats handler and channelz are 1180 // combined into one function to reduce stack usage -- a defer takes ~56-64 1181 // bytes on the stack, so overflowing the stack will require a stack 1182 // re-allocation, which is expensive. 1183 // 1184 // To maintain behavior similar to separate deferred statements, statements 1185 // should be executed in the reverse order. That is, tracing first, stats 1186 // handler second, and channelz last. Note that panics *within* defers will 1187 // lead to different behavior, but that's an acceptable compromise; that 1188 // would be undefined behavior territory anyway. 1189 defer func() { 1190 if trInfo != nil { 1191 if err != nil && err != io.EOF { 1192 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1193 trInfo.tr.SetError() 1194 } 1195 trInfo.tr.Finish() 1196 } 1197 1198 for _, sh := range shs { 1199 end := &stats.End{ 1200 BeginTime: statsBegin.BeginTime, 1201 EndTime: time.Now(), 1202 } 1203 if err != nil && err != io.EOF { 1204 end.Error = toRPCErr(err) 1205 } 1206 sh.HandleRPC(stream.Context(), end) 1207 } 1208 1209 if channelz.IsOn() { 1210 if err != nil && err != io.EOF { 1211 s.incrCallsFailed() 1212 } else { 1213 s.incrCallsSucceeded() 1214 } 1215 } 1216 }() 1217 } 1218 var binlogs []binarylog.MethodLogger 1219 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil { 1220 binlogs = append(binlogs, ml) 1221 } 1222 if s.opts.binaryLogger != nil { 1223 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil { 1224 binlogs = append(binlogs, ml) 1225 } 1226 } 1227 if len(binlogs) != 0 { 1228 ctx := stream.Context() 1229 md, _ := metadata.FromIncomingContext(ctx) 1230 logEntry := &binarylog.ClientHeader{ 1231 Header: md, 1232 MethodName: stream.Method(), 1233 PeerAddr: nil, 1234 } 1235 if deadline, ok := ctx.Deadline(); ok { 1236 logEntry.Timeout = time.Until(deadline) 1237 if logEntry.Timeout < 0 { 1238 logEntry.Timeout = 0 1239 } 1240 } 1241 if a := md[":authority"]; len(a) > 0 { 1242 logEntry.Authority = a[0] 1243 } 1244 if peer, ok := peer.FromContext(ctx); ok { 1245 logEntry.PeerAddr = peer.Addr 1246 } 1247 for _, binlog := range binlogs { 1248 binlog.Log(ctx, logEntry) 1249 } 1250 } 1251 1252 // comp and cp are used for compression. decomp and dc are used for 1253 // decompression. If comp and decomp are both set, they are the same; 1254 // however they are kept separate to ensure that at most one of the 1255 // compressor/decompressor variable pairs are set for use later. 1256 var comp, decomp encoding.Compressor 1257 var cp Compressor 1258 var dc Decompressor 1259 var sendCompressorName string 1260 1261 // If dc is set and matches the stream's compression, use it. Otherwise, try 1262 // to find a matching registered compressor for decomp. 1263 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1264 dc = s.opts.dc 1265 } else if rc != "" && rc != encoding.Identity { 1266 decomp = encoding.GetCompressor(rc) 1267 if decomp == nil { 1268 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1269 t.WriteStatus(stream, st) 1270 return st.Err() 1271 } 1272 } 1273 1274 // If cp is set, use it. Otherwise, attempt to compress the response using 1275 // the incoming message compression method. 1276 // 1277 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1278 if s.opts.cp != nil { 1279 cp = s.opts.cp 1280 sendCompressorName = cp.Type() 1281 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1282 // Legacy compressor not specified; attempt to respond with same encoding. 1283 comp = encoding.GetCompressor(rc) 1284 if comp != nil { 1285 sendCompressorName = comp.Name() 1286 } 1287 } 1288 1289 if sendCompressorName != "" { 1290 if err := stream.SetSendCompress(sendCompressorName); err != nil { 1291 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err) 1292 } 1293 } 1294 1295 var payInfo *payloadInfo 1296 if len(shs) != 0 || len(binlogs) != 0 { 1297 payInfo = &payloadInfo{} 1298 } 1299 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) 1300 if err != nil { 1301 if e := t.WriteStatus(stream, status.Convert(err)); e != nil { 1302 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1303 } 1304 return err 1305 } 1306 if channelz.IsOn() { 1307 t.IncrMsgRecv() 1308 } 1309 df := func(v interface{}) error { 1310 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { 1311 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 1312 } 1313 for _, sh := range shs { 1314 sh.HandleRPC(stream.Context(), &stats.InPayload{ 1315 RecvTime: time.Now(), 1316 Payload: v, 1317 Length: len(d), 1318 WireLength: payInfo.compressedLength + headerLen, 1319 CompressedLength: payInfo.compressedLength, 1320 Data: d, 1321 }) 1322 } 1323 if len(binlogs) != 0 { 1324 cm := &binarylog.ClientMessage{ 1325 Message: d, 1326 } 1327 for _, binlog := range binlogs { 1328 binlog.Log(stream.Context(), cm) 1329 } 1330 } 1331 if trInfo != nil { 1332 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1333 } 1334 return nil 1335 } 1336 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1337 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) 1338 if appErr != nil { 1339 appStatus, ok := status.FromError(appErr) 1340 if !ok { 1341 // Convert non-status application error to a status error with code 1342 // Unknown, but handle context errors specifically. 1343 appStatus = status.FromContextError(appErr) 1344 appErr = appStatus.Err() 1345 } 1346 if trInfo != nil { 1347 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1348 trInfo.tr.SetError() 1349 } 1350 if e := t.WriteStatus(stream, appStatus); e != nil { 1351 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1352 } 1353 if len(binlogs) != 0 { 1354 if h, _ := stream.Header(); h.Len() > 0 { 1355 // Only log serverHeader if there was header. Otherwise it can 1356 // be trailer only. 1357 sh := &binarylog.ServerHeader{ 1358 Header: h, 1359 } 1360 for _, binlog := range binlogs { 1361 binlog.Log(stream.Context(), sh) 1362 } 1363 } 1364 st := &binarylog.ServerTrailer{ 1365 Trailer: stream.Trailer(), 1366 Err: appErr, 1367 } 1368 for _, binlog := range binlogs { 1369 binlog.Log(stream.Context(), st) 1370 } 1371 } 1372 return appErr 1373 } 1374 if trInfo != nil { 1375 trInfo.tr.LazyLog(stringer("OK"), false) 1376 } 1377 opts := &transport.Options{Last: true} 1378 1379 // Server handler could have set new compressor by calling SetSendCompressor. 1380 // In case it is set, we need to use it for compressing outbound message. 1381 if stream.SendCompress() != sendCompressorName { 1382 comp = encoding.GetCompressor(stream.SendCompress()) 1383 } 1384 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1385 if err == io.EOF { 1386 // The entire stream is done (for unary RPC only). 1387 return err 1388 } 1389 if sts, ok := status.FromError(err); ok { 1390 if e := t.WriteStatus(stream, sts); e != nil { 1391 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) 1392 } 1393 } else { 1394 switch st := err.(type) { 1395 case transport.ConnectionError: 1396 // Nothing to do here. 1397 default: 1398 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1399 } 1400 } 1401 if len(binlogs) != 0 { 1402 h, _ := stream.Header() 1403 sh := &binarylog.ServerHeader{ 1404 Header: h, 1405 } 1406 st := &binarylog.ServerTrailer{ 1407 Trailer: stream.Trailer(), 1408 Err: appErr, 1409 } 1410 for _, binlog := range binlogs { 1411 binlog.Log(stream.Context(), sh) 1412 binlog.Log(stream.Context(), st) 1413 } 1414 } 1415 return err 1416 } 1417 if len(binlogs) != 0 { 1418 h, _ := stream.Header() 1419 sh := &binarylog.ServerHeader{ 1420 Header: h, 1421 } 1422 sm := &binarylog.ServerMessage{ 1423 Message: reply, 1424 } 1425 for _, binlog := range binlogs { 1426 binlog.Log(stream.Context(), sh) 1427 binlog.Log(stream.Context(), sm) 1428 } 1429 } 1430 if channelz.IsOn() { 1431 t.IncrMsgSent() 1432 } 1433 if trInfo != nil { 1434 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1435 } 1436 // TODO: Should we be logging if writing status failed here, like above? 1437 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1438 // error or allow the stats handler to see it? 1439 if len(binlogs) != 0 { 1440 st := &binarylog.ServerTrailer{ 1441 Trailer: stream.Trailer(), 1442 Err: appErr, 1443 } 1444 for _, binlog := range binlogs { 1445 binlog.Log(stream.Context(), st) 1446 } 1447 } 1448 return t.WriteStatus(stream, statusOK) 1449 } 1450 1451 // chainStreamServerInterceptors chains all stream server interceptors into one. 1452 func chainStreamServerInterceptors(s *Server) { 1453 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will 1454 // be executed before any other chained interceptors. 1455 interceptors := s.opts.chainStreamInts 1456 if s.opts.streamInt != nil { 1457 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...) 1458 } 1459 1460 var chainedInt StreamServerInterceptor 1461 if len(interceptors) == 0 { 1462 chainedInt = nil 1463 } else if len(interceptors) == 1 { 1464 chainedInt = interceptors[0] 1465 } else { 1466 chainedInt = chainStreamInterceptors(interceptors) 1467 } 1468 1469 s.opts.streamInt = chainedInt 1470 } 1471 1472 func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { 1473 return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { 1474 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler)) 1475 } 1476 } 1477 1478 func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { 1479 if curr == len(interceptors)-1 { 1480 return finalHandler 1481 } 1482 return func(srv interface{}, stream ServerStream) error { 1483 return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler)) 1484 } 1485 } 1486 1487 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { 1488 if channelz.IsOn() { 1489 s.incrCallsStarted() 1490 } 1491 shs := s.opts.statsHandlers 1492 var statsBegin *stats.Begin 1493 if len(shs) != 0 { 1494 beginTime := time.Now() 1495 statsBegin = &stats.Begin{ 1496 BeginTime: beginTime, 1497 IsClientStream: sd.ClientStreams, 1498 IsServerStream: sd.ServerStreams, 1499 } 1500 for _, sh := range shs { 1501 sh.HandleRPC(stream.Context(), statsBegin) 1502 } 1503 } 1504 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1505 ss := &serverStream{ 1506 ctx: ctx, 1507 t: t, 1508 s: stream, 1509 p: &parser{r: stream}, 1510 codec: s.getCodec(stream.ContentSubtype()), 1511 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1512 maxSendMessageSize: s.opts.maxSendMessageSize, 1513 trInfo: trInfo, 1514 statsHandler: shs, 1515 } 1516 1517 if len(shs) != 0 || trInfo != nil || channelz.IsOn() { 1518 // See comment in processUnaryRPC on defers. 1519 defer func() { 1520 if trInfo != nil { 1521 ss.mu.Lock() 1522 if err != nil && err != io.EOF { 1523 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1524 ss.trInfo.tr.SetError() 1525 } 1526 ss.trInfo.tr.Finish() 1527 ss.trInfo.tr = nil 1528 ss.mu.Unlock() 1529 } 1530 1531 if len(shs) != 0 { 1532 end := &stats.End{ 1533 BeginTime: statsBegin.BeginTime, 1534 EndTime: time.Now(), 1535 } 1536 if err != nil && err != io.EOF { 1537 end.Error = toRPCErr(err) 1538 } 1539 for _, sh := range shs { 1540 sh.HandleRPC(stream.Context(), end) 1541 } 1542 } 1543 1544 if channelz.IsOn() { 1545 if err != nil && err != io.EOF { 1546 s.incrCallsFailed() 1547 } else { 1548 s.incrCallsSucceeded() 1549 } 1550 } 1551 }() 1552 } 1553 1554 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil { 1555 ss.binlogs = append(ss.binlogs, ml) 1556 } 1557 if s.opts.binaryLogger != nil { 1558 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil { 1559 ss.binlogs = append(ss.binlogs, ml) 1560 } 1561 } 1562 if len(ss.binlogs) != 0 { 1563 md, _ := metadata.FromIncomingContext(ctx) 1564 logEntry := &binarylog.ClientHeader{ 1565 Header: md, 1566 MethodName: stream.Method(), 1567 PeerAddr: nil, 1568 } 1569 if deadline, ok := ctx.Deadline(); ok { 1570 logEntry.Timeout = time.Until(deadline) 1571 if logEntry.Timeout < 0 { 1572 logEntry.Timeout = 0 1573 } 1574 } 1575 if a := md[":authority"]; len(a) > 0 { 1576 logEntry.Authority = a[0] 1577 } 1578 if peer, ok := peer.FromContext(ss.Context()); ok { 1579 logEntry.PeerAddr = peer.Addr 1580 } 1581 for _, binlog := range ss.binlogs { 1582 binlog.Log(stream.Context(), logEntry) 1583 } 1584 } 1585 1586 // If dc is set and matches the stream's compression, use it. Otherwise, try 1587 // to find a matching registered compressor for decomp. 1588 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1589 ss.dc = s.opts.dc 1590 } else if rc != "" && rc != encoding.Identity { 1591 ss.decomp = encoding.GetCompressor(rc) 1592 if ss.decomp == nil { 1593 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1594 t.WriteStatus(ss.s, st) 1595 return st.Err() 1596 } 1597 } 1598 1599 // If cp is set, use it. Otherwise, attempt to compress the response using 1600 // the incoming message compression method. 1601 // 1602 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1603 if s.opts.cp != nil { 1604 ss.cp = s.opts.cp 1605 ss.sendCompressorName = s.opts.cp.Type() 1606 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1607 // Legacy compressor not specified; attempt to respond with same encoding. 1608 ss.comp = encoding.GetCompressor(rc) 1609 if ss.comp != nil { 1610 ss.sendCompressorName = rc 1611 } 1612 } 1613 1614 if ss.sendCompressorName != "" { 1615 if err := stream.SetSendCompress(ss.sendCompressorName); err != nil { 1616 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err) 1617 } 1618 } 1619 1620 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp) 1621 1622 if trInfo != nil { 1623 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1624 } 1625 var appErr error 1626 var server interface{} 1627 if info != nil { 1628 server = info.serviceImpl 1629 } 1630 if s.opts.streamInt == nil { 1631 appErr = sd.Handler(server, ss) 1632 } else { 1633 info := &StreamServerInfo{ 1634 FullMethod: stream.Method(), 1635 IsClientStream: sd.ClientStreams, 1636 IsServerStream: sd.ServerStreams, 1637 } 1638 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1639 } 1640 if appErr != nil { 1641 appStatus, ok := status.FromError(appErr) 1642 if !ok { 1643 // Convert non-status application error to a status error with code 1644 // Unknown, but handle context errors specifically. 1645 appStatus = status.FromContextError(appErr) 1646 appErr = appStatus.Err() 1647 } 1648 if trInfo != nil { 1649 ss.mu.Lock() 1650 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1651 ss.trInfo.tr.SetError() 1652 ss.mu.Unlock() 1653 } 1654 if len(ss.binlogs) != 0 { 1655 st := &binarylog.ServerTrailer{ 1656 Trailer: ss.s.Trailer(), 1657 Err: appErr, 1658 } 1659 for _, binlog := range ss.binlogs { 1660 binlog.Log(stream.Context(), st) 1661 } 1662 } 1663 t.WriteStatus(ss.s, appStatus) 1664 // TODO: Should we log an error from WriteStatus here and below? 1665 return appErr 1666 } 1667 if trInfo != nil { 1668 ss.mu.Lock() 1669 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1670 ss.mu.Unlock() 1671 } 1672 if len(ss.binlogs) != 0 { 1673 st := &binarylog.ServerTrailer{ 1674 Trailer: ss.s.Trailer(), 1675 Err: appErr, 1676 } 1677 for _, binlog := range ss.binlogs { 1678 binlog.Log(stream.Context(), st) 1679 } 1680 } 1681 return t.WriteStatus(ss.s, statusOK) 1682 } 1683 1684 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1685 sm := stream.Method() 1686 if sm != "" && sm[0] == '/' { 1687 sm = sm[1:] 1688 } 1689 pos := strings.LastIndex(sm, "/") 1690 if pos == -1 { 1691 if trInfo != nil { 1692 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1693 trInfo.tr.SetError() 1694 } 1695 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1696 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1697 if trInfo != nil { 1698 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1699 trInfo.tr.SetError() 1700 } 1701 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1702 } 1703 if trInfo != nil { 1704 trInfo.tr.Finish() 1705 } 1706 return 1707 } 1708 service := sm[:pos] 1709 method := sm[pos+1:] 1710 1711 srv, knownService := s.services[service] 1712 if knownService { 1713 if md, ok := srv.methods[method]; ok { 1714 s.processUnaryRPC(t, stream, srv, md, trInfo) 1715 return 1716 } 1717 if sd, ok := srv.streams[method]; ok { 1718 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1719 return 1720 } 1721 } 1722 // Unknown service, or known server unknown method. 1723 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1724 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1725 return 1726 } 1727 var errDesc string 1728 if !knownService { 1729 errDesc = fmt.Sprintf("unknown service %v", service) 1730 } else { 1731 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) 1732 } 1733 if trInfo != nil { 1734 trInfo.tr.LazyPrintf("%s", errDesc) 1735 trInfo.tr.SetError() 1736 } 1737 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1738 if trInfo != nil { 1739 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1740 trInfo.tr.SetError() 1741 } 1742 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) 1743 } 1744 if trInfo != nil { 1745 trInfo.tr.Finish() 1746 } 1747 } 1748 1749 // The key to save ServerTransportStream in the context. 1750 type streamKey struct{} 1751 1752 // NewContextWithServerTransportStream creates a new context from ctx and 1753 // attaches stream to it. 1754 // 1755 // # Experimental 1756 // 1757 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 1758 // later release. 1759 func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1760 return context.WithValue(ctx, streamKey{}, stream) 1761 } 1762 1763 // ServerTransportStream is a minimal interface that a transport stream must 1764 // implement. This can be used to mock an actual transport stream for tests of 1765 // handler code that use, for example, grpc.SetHeader (which requires some 1766 // stream to be in context). 1767 // 1768 // See also NewContextWithServerTransportStream. 1769 // 1770 // # Experimental 1771 // 1772 // Notice: This type is EXPERIMENTAL and may be changed or removed in a 1773 // later release. 1774 type ServerTransportStream interface { 1775 Method() string 1776 SetHeader(md metadata.MD) error 1777 SendHeader(md metadata.MD) error 1778 SetTrailer(md metadata.MD) error 1779 } 1780 1781 // ServerTransportStreamFromContext returns the ServerTransportStream saved in 1782 // ctx. Returns nil if the given context has no stream associated with it 1783 // (which implies it is not an RPC invocation context). 1784 // 1785 // # Experimental 1786 // 1787 // Notice: This API is EXPERIMENTAL and may be changed or removed in a 1788 // later release. 1789 func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1790 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1791 return s 1792 } 1793 1794 // Stop stops the gRPC server. It immediately closes all open 1795 // connections and listeners. 1796 // It cancels all active RPCs on the server side and the corresponding 1797 // pending RPCs on the client side will get notified by connection 1798 // errors. 1799 func (s *Server) Stop() { 1800 s.quit.Fire() 1801 1802 defer func() { 1803 s.serveWG.Wait() 1804 s.done.Fire() 1805 }() 1806 1807 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) 1808 1809 s.mu.Lock() 1810 listeners := s.lis 1811 s.lis = nil 1812 conns := s.conns 1813 s.conns = nil 1814 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1815 s.cv.Broadcast() 1816 s.mu.Unlock() 1817 1818 for lis := range listeners { 1819 lis.Close() 1820 } 1821 for _, cs := range conns { 1822 for st := range cs { 1823 st.Close(errors.New("Server.Stop called")) 1824 } 1825 } 1826 if s.opts.numServerWorkers > 0 { 1827 s.stopServerWorkers() 1828 } 1829 1830 s.mu.Lock() 1831 if s.events != nil { 1832 s.events.Finish() 1833 s.events = nil 1834 } 1835 s.mu.Unlock() 1836 } 1837 1838 // GracefulStop stops the gRPC server gracefully. It stops the server from 1839 // accepting new connections and RPCs and blocks until all the pending RPCs are 1840 // finished. 1841 func (s *Server) GracefulStop() { 1842 s.quit.Fire() 1843 defer s.done.Fire() 1844 1845 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) 1846 s.mu.Lock() 1847 if s.conns == nil { 1848 s.mu.Unlock() 1849 return 1850 } 1851 1852 for lis := range s.lis { 1853 lis.Close() 1854 } 1855 s.lis = nil 1856 if !s.drain { 1857 for _, conns := range s.conns { 1858 for st := range conns { 1859 st.Drain() 1860 } 1861 } 1862 s.drain = true 1863 } 1864 1865 // Wait for serving threads to be ready to exit. Only then can we be sure no 1866 // new conns will be created. 1867 s.mu.Unlock() 1868 s.serveWG.Wait() 1869 s.mu.Lock() 1870 1871 for len(s.conns) != 0 { 1872 s.cv.Wait() 1873 } 1874 s.conns = nil 1875 if s.events != nil { 1876 s.events.Finish() 1877 s.events = nil 1878 } 1879 s.mu.Unlock() 1880 } 1881 1882 // contentSubtype must be lowercase 1883 // cannot return nil 1884 func (s *Server) getCodec(contentSubtype string) baseCodec { 1885 if s.opts.codec != nil { 1886 return s.opts.codec 1887 } 1888 if contentSubtype == "" { 1889 return encoding.GetCodec(proto.Name) 1890 } 1891 codec := encoding.GetCodec(contentSubtype) 1892 if codec == nil { 1893 return encoding.GetCodec(proto.Name) 1894 } 1895 return codec 1896 } 1897 1898 // SetHeader sets the header metadata to be sent from the server to the client. 1899 // The context provided must be the context passed to the server's handler. 1900 // 1901 // Streaming RPCs should prefer the SetHeader method of the ServerStream. 1902 // 1903 // When called multiple times, all the provided metadata will be merged. All 1904 // the metadata will be sent out when one of the following happens: 1905 // 1906 // - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader. 1907 // - The first response message is sent. For unary handlers, this occurs when 1908 // the handler returns; for streaming handlers, this can happen when stream's 1909 // SendMsg method is called. 1910 // - An RPC status is sent out (error or success). This occurs when the handler 1911 // returns. 1912 // 1913 // SetHeader will fail if called after any of the events above. 1914 // 1915 // The error returned is compatible with the status package. However, the 1916 // status code will often not match the RPC status as seen by the client 1917 // application, and therefore, should not be relied upon for this purpose. 1918 func SetHeader(ctx context.Context, md metadata.MD) error { 1919 if md.Len() == 0 { 1920 return nil 1921 } 1922 stream := ServerTransportStreamFromContext(ctx) 1923 if stream == nil { 1924 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1925 } 1926 return stream.SetHeader(md) 1927 } 1928 1929 // SendHeader sends header metadata. It may be called at most once, and may not 1930 // be called after any event that causes headers to be sent (see SetHeader for 1931 // a complete list). The provided md and headers set by SetHeader() will be 1932 // sent. 1933 // 1934 // The error returned is compatible with the status package. However, the 1935 // status code will often not match the RPC status as seen by the client 1936 // application, and therefore, should not be relied upon for this purpose. 1937 func SendHeader(ctx context.Context, md metadata.MD) error { 1938 stream := ServerTransportStreamFromContext(ctx) 1939 if stream == nil { 1940 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1941 } 1942 if err := stream.SendHeader(md); err != nil { 1943 return toRPCErr(err) 1944 } 1945 return nil 1946 } 1947 1948 // SetSendCompressor sets a compressor for outbound messages from the server. 1949 // It must not be called after any event that causes headers to be sent 1950 // (see ServerStream.SetHeader for the complete list). Provided compressor is 1951 // used when below conditions are met: 1952 // 1953 // - compressor is registered via encoding.RegisterCompressor 1954 // - compressor name must exist in the client advertised compressor names 1955 // sent in grpc-accept-encoding header. Use ClientSupportedCompressors to 1956 // get client supported compressor names. 1957 // 1958 // The context provided must be the context passed to the server's handler. 1959 // It must be noted that compressor name encoding.Identity disables the 1960 // outbound compression. 1961 // By default, server messages will be sent using the same compressor with 1962 // which request messages were sent. 1963 // 1964 // It is not safe to call SetSendCompressor concurrently with SendHeader and 1965 // SendMsg. 1966 // 1967 // # Experimental 1968 // 1969 // Notice: This function is EXPERIMENTAL and may be changed or removed in a 1970 // later release. 1971 func SetSendCompressor(ctx context.Context, name string) error { 1972 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) 1973 if !ok || stream == nil { 1974 return fmt.Errorf("failed to fetch the stream from the given context") 1975 } 1976 1977 if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil { 1978 return fmt.Errorf("unable to set send compressor: %w", err) 1979 } 1980 1981 return stream.SetSendCompress(name) 1982 } 1983 1984 // ClientSupportedCompressors returns compressor names advertised by the client 1985 // via grpc-accept-encoding header. 1986 // 1987 // The context provided must be the context passed to the server's handler. 1988 // 1989 // # Experimental 1990 // 1991 // Notice: This function is EXPERIMENTAL and may be changed or removed in a 1992 // later release. 1993 func ClientSupportedCompressors(ctx context.Context) ([]string, error) { 1994 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) 1995 if !ok || stream == nil { 1996 return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx) 1997 } 1998 1999 return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil 2000 } 2001 2002 // SetTrailer sets the trailer metadata that will be sent when an RPC returns. 2003 // When called more than once, all the provided metadata will be merged. 2004 // 2005 // The error returned is compatible with the status package. However, the 2006 // status code will often not match the RPC status as seen by the client 2007 // application, and therefore, should not be relied upon for this purpose. 2008 func SetTrailer(ctx context.Context, md metadata.MD) error { 2009 if md.Len() == 0 { 2010 return nil 2011 } 2012 stream := ServerTransportStreamFromContext(ctx) 2013 if stream == nil { 2014 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 2015 } 2016 return stream.SetTrailer(md) 2017 } 2018 2019 // Method returns the method string for the server context. The returned 2020 // string is in the format of "/service/method". 2021 func Method(ctx context.Context) (string, bool) { 2022 s := ServerTransportStreamFromContext(ctx) 2023 if s == nil { 2024 return "", false 2025 } 2026 return s.Method(), true 2027 } 2028 2029 type channelzServer struct { 2030 s *Server 2031 } 2032 2033 func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { 2034 return c.s.channelzMetric() 2035 } 2036 2037 // validateSendCompressor returns an error when given compressor name cannot be 2038 // handled by the server or the client based on the advertised compressors. 2039 func validateSendCompressor(name, clientCompressors string) error { 2040 if name == encoding.Identity { 2041 return nil 2042 } 2043 2044 if !grpcutil.IsCompressorNameRegistered(name) { 2045 return fmt.Errorf("compressor not registered %q", name) 2046 } 2047 2048 for _, c := range strings.Split(clientCompressors, ",") { 2049 if c == name { 2050 return nil // found match 2051 } 2052 } 2053 return fmt.Errorf("client does not support compressor %q", name) 2054 }