controlbuf.go (27344B)
1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package transport 20 21 import ( 22 "bytes" 23 "errors" 24 "fmt" 25 "net" 26 "runtime" 27 "strconv" 28 "sync" 29 "sync/atomic" 30 31 "golang.org/x/net/http2" 32 "golang.org/x/net/http2/hpack" 33 "google.golang.org/grpc/internal/grpclog" 34 "google.golang.org/grpc/internal/grpcutil" 35 "google.golang.org/grpc/status" 36 ) 37 38 var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { 39 e.SetMaxDynamicTableSizeLimit(v) 40 } 41 42 type itemNode struct { 43 it interface{} 44 next *itemNode 45 } 46 47 type itemList struct { 48 head *itemNode 49 tail *itemNode 50 } 51 52 func (il *itemList) enqueue(i interface{}) { 53 n := &itemNode{it: i} 54 if il.tail == nil { 55 il.head, il.tail = n, n 56 return 57 } 58 il.tail.next = n 59 il.tail = n 60 } 61 62 // peek returns the first item in the list without removing it from the 63 // list. 64 func (il *itemList) peek() interface{} { 65 return il.head.it 66 } 67 68 func (il *itemList) dequeue() interface{} { 69 if il.head == nil { 70 return nil 71 } 72 i := il.head.it 73 il.head = il.head.next 74 if il.head == nil { 75 il.tail = nil 76 } 77 return i 78 } 79 80 func (il *itemList) dequeueAll() *itemNode { 81 h := il.head 82 il.head, il.tail = nil, nil 83 return h 84 } 85 86 func (il *itemList) isEmpty() bool { 87 return il.head == nil 88 } 89 90 // The following defines various control items which could flow through 91 // the control buffer of transport. They represent different aspects of 92 // control tasks, e.g., flow control, settings, streaming resetting, etc. 93 94 // maxQueuedTransportResponseFrames is the most queued "transport response" 95 // frames we will buffer before preventing new reads from occurring on the 96 // transport. These are control frames sent in response to client requests, 97 // such as RST_STREAM due to bad headers or settings acks. 98 const maxQueuedTransportResponseFrames = 50 99 100 type cbItem interface { 101 isTransportResponseFrame() bool 102 } 103 104 // registerStream is used to register an incoming stream with loopy writer. 105 type registerStream struct { 106 streamID uint32 107 wq *writeQuota 108 } 109 110 func (*registerStream) isTransportResponseFrame() bool { return false } 111 112 // headerFrame is also used to register stream on the client-side. 113 type headerFrame struct { 114 streamID uint32 115 hf []hpack.HeaderField 116 endStream bool // Valid on server side. 117 initStream func(uint32) error // Used only on the client side. 118 onWrite func() 119 wq *writeQuota // write quota for the stream created. 120 cleanup *cleanupStream // Valid on the server side. 121 onOrphaned func(error) // Valid on client-side 122 } 123 124 func (h *headerFrame) isTransportResponseFrame() bool { 125 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM 126 } 127 128 type cleanupStream struct { 129 streamID uint32 130 rst bool 131 rstCode http2.ErrCode 132 onWrite func() 133 } 134 135 func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM 136 137 type earlyAbortStream struct { 138 httpStatus uint32 139 streamID uint32 140 contentSubtype string 141 status *status.Status 142 rst bool 143 } 144 145 func (*earlyAbortStream) isTransportResponseFrame() bool { return false } 146 147 type dataFrame struct { 148 streamID uint32 149 endStream bool 150 h []byte 151 d []byte 152 // onEachWrite is called every time 153 // a part of d is written out. 154 onEachWrite func() 155 } 156 157 func (*dataFrame) isTransportResponseFrame() bool { return false } 158 159 type incomingWindowUpdate struct { 160 streamID uint32 161 increment uint32 162 } 163 164 func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false } 165 166 type outgoingWindowUpdate struct { 167 streamID uint32 168 increment uint32 169 } 170 171 func (*outgoingWindowUpdate) isTransportResponseFrame() bool { 172 return false // window updates are throttled by thresholds 173 } 174 175 type incomingSettings struct { 176 ss []http2.Setting 177 } 178 179 func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK 180 181 type outgoingSettings struct { 182 ss []http2.Setting 183 } 184 185 func (*outgoingSettings) isTransportResponseFrame() bool { return false } 186 187 type incomingGoAway struct { 188 } 189 190 func (*incomingGoAway) isTransportResponseFrame() bool { return false } 191 192 type goAway struct { 193 code http2.ErrCode 194 debugData []byte 195 headsUp bool 196 closeConn error // if set, loopyWriter will exit, resulting in conn closure 197 } 198 199 func (*goAway) isTransportResponseFrame() bool { return false } 200 201 type ping struct { 202 ack bool 203 data [8]byte 204 } 205 206 func (*ping) isTransportResponseFrame() bool { return true } 207 208 type outFlowControlSizeRequest struct { 209 resp chan uint32 210 } 211 212 func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false } 213 214 // closeConnection is an instruction to tell the loopy writer to flush the 215 // framer and exit, which will cause the transport's connection to be closed 216 // (by the client or server). The transport itself will close after the reader 217 // encounters the EOF caused by the connection closure. 218 type closeConnection struct{} 219 220 func (closeConnection) isTransportResponseFrame() bool { return false } 221 222 type outStreamState int 223 224 const ( 225 active outStreamState = iota 226 empty 227 waitingOnStreamQuota 228 ) 229 230 type outStream struct { 231 id uint32 232 state outStreamState 233 itl *itemList 234 bytesOutStanding int 235 wq *writeQuota 236 237 next *outStream 238 prev *outStream 239 } 240 241 func (s *outStream) deleteSelf() { 242 if s.prev != nil { 243 s.prev.next = s.next 244 } 245 if s.next != nil { 246 s.next.prev = s.prev 247 } 248 s.next, s.prev = nil, nil 249 } 250 251 type outStreamList struct { 252 // Following are sentinel objects that mark the 253 // beginning and end of the list. They do not 254 // contain any item lists. All valid objects are 255 // inserted in between them. 256 // This is needed so that an outStream object can 257 // deleteSelf() in O(1) time without knowing which 258 // list it belongs to. 259 head *outStream 260 tail *outStream 261 } 262 263 func newOutStreamList() *outStreamList { 264 head, tail := new(outStream), new(outStream) 265 head.next = tail 266 tail.prev = head 267 return &outStreamList{ 268 head: head, 269 tail: tail, 270 } 271 } 272 273 func (l *outStreamList) enqueue(s *outStream) { 274 e := l.tail.prev 275 e.next = s 276 s.prev = e 277 s.next = l.tail 278 l.tail.prev = s 279 } 280 281 // remove from the beginning of the list. 282 func (l *outStreamList) dequeue() *outStream { 283 b := l.head.next 284 if b == l.tail { 285 return nil 286 } 287 b.deleteSelf() 288 return b 289 } 290 291 // controlBuffer is a way to pass information to loopy. 292 // Information is passed as specific struct types called control frames. 293 // A control frame not only represents data, messages or headers to be sent out 294 // but can also be used to instruct loopy to update its internal state. 295 // It shouldn't be confused with an HTTP2 frame, although some of the control frames 296 // like dataFrame and headerFrame do go out on wire as HTTP2 frames. 297 type controlBuffer struct { 298 ch chan struct{} 299 done <-chan struct{} 300 mu sync.Mutex 301 consumerWaiting bool 302 list *itemList 303 err error 304 305 // transportResponseFrames counts the number of queued items that represent 306 // the response of an action initiated by the peer. trfChan is created 307 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is 308 // closed and nilled when transportResponseFrames drops below the 309 // threshold. Both fields are protected by mu. 310 transportResponseFrames int 311 trfChan atomic.Value // chan struct{} 312 } 313 314 func newControlBuffer(done <-chan struct{}) *controlBuffer { 315 return &controlBuffer{ 316 ch: make(chan struct{}, 1), 317 list: &itemList{}, 318 done: done, 319 } 320 } 321 322 // throttle blocks if there are too many incomingSettings/cleanupStreams in the 323 // controlbuf. 324 func (c *controlBuffer) throttle() { 325 ch, _ := c.trfChan.Load().(chan struct{}) 326 if ch != nil { 327 select { 328 case <-ch: 329 case <-c.done: 330 } 331 } 332 } 333 334 func (c *controlBuffer) put(it cbItem) error { 335 _, err := c.executeAndPut(nil, it) 336 return err 337 } 338 339 func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) { 340 var wakeUp bool 341 c.mu.Lock() 342 if c.err != nil { 343 c.mu.Unlock() 344 return false, c.err 345 } 346 if f != nil { 347 if !f(it) { // f wasn't successful 348 c.mu.Unlock() 349 return false, nil 350 } 351 } 352 if c.consumerWaiting { 353 wakeUp = true 354 c.consumerWaiting = false 355 } 356 c.list.enqueue(it) 357 if it.isTransportResponseFrame() { 358 c.transportResponseFrames++ 359 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 360 // We are adding the frame that puts us over the threshold; create 361 // a throttling channel. 362 c.trfChan.Store(make(chan struct{})) 363 } 364 } 365 c.mu.Unlock() 366 if wakeUp { 367 select { 368 case c.ch <- struct{}{}: 369 default: 370 } 371 } 372 return true, nil 373 } 374 375 // Note argument f should never be nil. 376 func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { 377 c.mu.Lock() 378 if c.err != nil { 379 c.mu.Unlock() 380 return false, c.err 381 } 382 if !f(it) { // f wasn't successful 383 c.mu.Unlock() 384 return false, nil 385 } 386 c.mu.Unlock() 387 return true, nil 388 } 389 390 func (c *controlBuffer) get(block bool) (interface{}, error) { 391 for { 392 c.mu.Lock() 393 if c.err != nil { 394 c.mu.Unlock() 395 return nil, c.err 396 } 397 if !c.list.isEmpty() { 398 h := c.list.dequeue().(cbItem) 399 if h.isTransportResponseFrame() { 400 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 401 // We are removing the frame that put us over the 402 // threshold; close and clear the throttling channel. 403 ch := c.trfChan.Load().(chan struct{}) 404 close(ch) 405 c.trfChan.Store((chan struct{})(nil)) 406 } 407 c.transportResponseFrames-- 408 } 409 c.mu.Unlock() 410 return h, nil 411 } 412 if !block { 413 c.mu.Unlock() 414 return nil, nil 415 } 416 c.consumerWaiting = true 417 c.mu.Unlock() 418 select { 419 case <-c.ch: 420 case <-c.done: 421 return nil, errors.New("transport closed by client") 422 } 423 } 424 } 425 426 func (c *controlBuffer) finish() { 427 c.mu.Lock() 428 if c.err != nil { 429 c.mu.Unlock() 430 return 431 } 432 c.err = ErrConnClosing 433 // There may be headers for streams in the control buffer. 434 // These streams need to be cleaned out since the transport 435 // is still not aware of these yet. 436 for head := c.list.dequeueAll(); head != nil; head = head.next { 437 hdr, ok := head.it.(*headerFrame) 438 if !ok { 439 continue 440 } 441 if hdr.onOrphaned != nil { // It will be nil on the server-side. 442 hdr.onOrphaned(ErrConnClosing) 443 } 444 } 445 // In case throttle() is currently in flight, it needs to be unblocked. 446 // Otherwise, the transport may not close, since the transport is closed by 447 // the reader encountering the connection error. 448 ch, _ := c.trfChan.Load().(chan struct{}) 449 if ch != nil { 450 close(ch) 451 } 452 c.trfChan.Store((chan struct{})(nil)) 453 c.mu.Unlock() 454 } 455 456 type side int 457 458 const ( 459 clientSide side = iota 460 serverSide 461 ) 462 463 // Loopy receives frames from the control buffer. 464 // Each frame is handled individually; most of the work done by loopy goes 465 // into handling data frames. Loopy maintains a queue of active streams, and each 466 // stream maintains a queue of data frames; as loopy receives data frames 467 // it gets added to the queue of the relevant stream. 468 // Loopy goes over this list of active streams by processing one node every iteration, 469 // thereby closely resemebling to a round-robin scheduling over all streams. While 470 // processing a stream, loopy writes out data bytes from this stream capped by the min 471 // of http2MaxFrameLen, connection-level flow control and stream-level flow control. 472 type loopyWriter struct { 473 side side 474 cbuf *controlBuffer 475 sendQuota uint32 476 oiws uint32 // outbound initial window size. 477 // estdStreams is map of all established streams that are not cleaned-up yet. 478 // On client-side, this is all streams whose headers were sent out. 479 // On server-side, this is all streams whose headers were received. 480 estdStreams map[uint32]*outStream // Established streams. 481 // activeStreams is a linked-list of all streams that have data to send and some 482 // stream-level flow control quota. 483 // Each of these streams internally have a list of data items(and perhaps trailers 484 // on the server-side) to be sent out. 485 activeStreams *outStreamList 486 framer *framer 487 hBuf *bytes.Buffer // The buffer for HPACK encoding. 488 hEnc *hpack.Encoder // HPACK encoder. 489 bdpEst *bdpEstimator 490 draining bool 491 conn net.Conn 492 logger *grpclog.PrefixLogger 493 494 // Side-specific handlers 495 ssGoAwayHandler func(*goAway) (bool, error) 496 } 497 498 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter { 499 var buf bytes.Buffer 500 l := &loopyWriter{ 501 side: s, 502 cbuf: cbuf, 503 sendQuota: defaultWindowSize, 504 oiws: defaultWindowSize, 505 estdStreams: make(map[uint32]*outStream), 506 activeStreams: newOutStreamList(), 507 framer: fr, 508 hBuf: &buf, 509 hEnc: hpack.NewEncoder(&buf), 510 bdpEst: bdpEst, 511 conn: conn, 512 logger: logger, 513 } 514 return l 515 } 516 517 const minBatchSize = 1000 518 519 // run should be run in a separate goroutine. 520 // It reads control frames from controlBuf and processes them by: 521 // 1. Updating loopy's internal state, or/and 522 // 2. Writing out HTTP2 frames on the wire. 523 // 524 // Loopy keeps all active streams with data to send in a linked-list. 525 // All streams in the activeStreams linked-list must have both: 526 // 1. Data to send, and 527 // 2. Stream level flow control quota available. 528 // 529 // In each iteration of run loop, other than processing the incoming control 530 // frame, loopy calls processData, which processes one node from the 531 // activeStreams linked-list. This results in writing of HTTP2 frames into an 532 // underlying write buffer. When there's no more control frames to read from 533 // controlBuf, loopy flushes the write buffer. As an optimization, to increase 534 // the batch size for each flush, loopy yields the processor, once if the batch 535 // size is too low to give stream goroutines a chance to fill it up. 536 // 537 // Upon exiting, if the error causing the exit is not an I/O error, run() 538 // flushes and closes the underlying connection. Otherwise, the connection is 539 // left open to allow the I/O error to be encountered by the reader instead. 540 func (l *loopyWriter) run() (err error) { 541 defer func() { 542 if l.logger.V(logLevel) { 543 l.logger.Infof("loopyWriter exiting with error: %v", err) 544 } 545 if !isIOError(err) { 546 l.framer.writer.Flush() 547 l.conn.Close() 548 } 549 l.cbuf.finish() 550 }() 551 for { 552 it, err := l.cbuf.get(true) 553 if err != nil { 554 return err 555 } 556 if err = l.handle(it); err != nil { 557 return err 558 } 559 if _, err = l.processData(); err != nil { 560 return err 561 } 562 gosched := true 563 hasdata: 564 for { 565 it, err := l.cbuf.get(false) 566 if err != nil { 567 return err 568 } 569 if it != nil { 570 if err = l.handle(it); err != nil { 571 return err 572 } 573 if _, err = l.processData(); err != nil { 574 return err 575 } 576 continue hasdata 577 } 578 isEmpty, err := l.processData() 579 if err != nil { 580 return err 581 } 582 if !isEmpty { 583 continue hasdata 584 } 585 if gosched { 586 gosched = false 587 if l.framer.writer.offset < minBatchSize { 588 runtime.Gosched() 589 continue hasdata 590 } 591 } 592 l.framer.writer.Flush() 593 break hasdata 594 } 595 } 596 } 597 598 func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { 599 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) 600 } 601 602 func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) { 603 // Otherwise update the quota. 604 if w.streamID == 0 { 605 l.sendQuota += w.increment 606 return 607 } 608 // Find the stream and update it. 609 if str, ok := l.estdStreams[w.streamID]; ok { 610 str.bytesOutStanding -= int(w.increment) 611 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { 612 str.state = active 613 l.activeStreams.enqueue(str) 614 return 615 } 616 } 617 } 618 619 func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { 620 return l.framer.fr.WriteSettings(s.ss...) 621 } 622 623 func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { 624 l.applySettings(s.ss) 625 return l.framer.fr.WriteSettingsAck() 626 } 627 628 func (l *loopyWriter) registerStreamHandler(h *registerStream) { 629 str := &outStream{ 630 id: h.streamID, 631 state: empty, 632 itl: &itemList{}, 633 wq: h.wq, 634 } 635 l.estdStreams[h.streamID] = str 636 } 637 638 func (l *loopyWriter) headerHandler(h *headerFrame) error { 639 if l.side == serverSide { 640 str, ok := l.estdStreams[h.streamID] 641 if !ok { 642 if l.logger.V(logLevel) { 643 l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID) 644 } 645 return nil 646 } 647 // Case 1.A: Server is responding back with headers. 648 if !h.endStream { 649 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) 650 } 651 // else: Case 1.B: Server wants to close stream. 652 653 if str.state != empty { // either active or waiting on stream quota. 654 // add it str's list of items. 655 str.itl.enqueue(h) 656 return nil 657 } 658 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { 659 return err 660 } 661 return l.cleanupStreamHandler(h.cleanup) 662 } 663 // Case 2: Client wants to originate stream. 664 str := &outStream{ 665 id: h.streamID, 666 state: empty, 667 itl: &itemList{}, 668 wq: h.wq, 669 } 670 return l.originateStream(str, h) 671 } 672 673 func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error { 674 // l.draining is set when handling GoAway. In which case, we want to avoid 675 // creating new streams. 676 if l.draining { 677 // TODO: provide a better error with the reason we are in draining. 678 hdr.onOrphaned(errStreamDrain) 679 return nil 680 } 681 if err := hdr.initStream(str.id); err != nil { 682 return err 683 } 684 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { 685 return err 686 } 687 l.estdStreams[str.id] = str 688 return nil 689 } 690 691 func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { 692 if onWrite != nil { 693 onWrite() 694 } 695 l.hBuf.Reset() 696 for _, f := range hf { 697 if err := l.hEnc.WriteField(f); err != nil { 698 if l.logger.V(logLevel) { 699 l.logger.Warningf("Encountered error while encoding headers: %v", err) 700 } 701 } 702 } 703 var ( 704 err error 705 endHeaders, first bool 706 ) 707 first = true 708 for !endHeaders { 709 size := l.hBuf.Len() 710 if size > http2MaxFrameLen { 711 size = http2MaxFrameLen 712 } else { 713 endHeaders = true 714 } 715 if first { 716 first = false 717 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ 718 StreamID: streamID, 719 BlockFragment: l.hBuf.Next(size), 720 EndStream: endStream, 721 EndHeaders: endHeaders, 722 }) 723 } else { 724 err = l.framer.fr.WriteContinuation( 725 streamID, 726 endHeaders, 727 l.hBuf.Next(size), 728 ) 729 } 730 if err != nil { 731 return err 732 } 733 } 734 return nil 735 } 736 737 func (l *loopyWriter) preprocessData(df *dataFrame) { 738 str, ok := l.estdStreams[df.streamID] 739 if !ok { 740 return 741 } 742 // If we got data for a stream it means that 743 // stream was originated and the headers were sent out. 744 str.itl.enqueue(df) 745 if str.state == empty { 746 str.state = active 747 l.activeStreams.enqueue(str) 748 } 749 } 750 751 func (l *loopyWriter) pingHandler(p *ping) error { 752 if !p.ack { 753 l.bdpEst.timesnap(p.data) 754 } 755 return l.framer.fr.WritePing(p.ack, p.data) 756 757 } 758 759 func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) { 760 o.resp <- l.sendQuota 761 } 762 763 func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { 764 c.onWrite() 765 if str, ok := l.estdStreams[c.streamID]; ok { 766 // On the server side it could be a trailers-only response or 767 // a RST_STREAM before stream initialization thus the stream might 768 // not be established yet. 769 delete(l.estdStreams, c.streamID) 770 str.deleteSelf() 771 } 772 if c.rst { // If RST_STREAM needs to be sent. 773 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { 774 return err 775 } 776 } 777 if l.draining && len(l.estdStreams) == 0 { 778 // Flush and close the connection; we are done with it. 779 return errors.New("finished processing active streams while in draining mode") 780 } 781 return nil 782 } 783 784 func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { 785 if l.side == clientSide { 786 return errors.New("earlyAbortStream not handled on client") 787 } 788 // In case the caller forgets to set the http status, default to 200. 789 if eas.httpStatus == 0 { 790 eas.httpStatus = 200 791 } 792 headerFields := []hpack.HeaderField{ 793 {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))}, 794 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)}, 795 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))}, 796 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())}, 797 } 798 799 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil { 800 return err 801 } 802 if eas.rst { 803 if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil { 804 return err 805 } 806 } 807 return nil 808 } 809 810 func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { 811 if l.side == clientSide { 812 l.draining = true 813 if len(l.estdStreams) == 0 { 814 // Flush and close the connection; we are done with it. 815 return errors.New("received GOAWAY with no active streams") 816 } 817 } 818 return nil 819 } 820 821 func (l *loopyWriter) goAwayHandler(g *goAway) error { 822 // Handling of outgoing GoAway is very specific to side. 823 if l.ssGoAwayHandler != nil { 824 draining, err := l.ssGoAwayHandler(g) 825 if err != nil { 826 return err 827 } 828 l.draining = draining 829 } 830 return nil 831 } 832 833 func (l *loopyWriter) handle(i interface{}) error { 834 switch i := i.(type) { 835 case *incomingWindowUpdate: 836 l.incomingWindowUpdateHandler(i) 837 case *outgoingWindowUpdate: 838 return l.outgoingWindowUpdateHandler(i) 839 case *incomingSettings: 840 return l.incomingSettingsHandler(i) 841 case *outgoingSettings: 842 return l.outgoingSettingsHandler(i) 843 case *headerFrame: 844 return l.headerHandler(i) 845 case *registerStream: 846 l.registerStreamHandler(i) 847 case *cleanupStream: 848 return l.cleanupStreamHandler(i) 849 case *earlyAbortStream: 850 return l.earlyAbortStreamHandler(i) 851 case *incomingGoAway: 852 return l.incomingGoAwayHandler(i) 853 case *dataFrame: 854 l.preprocessData(i) 855 case *ping: 856 return l.pingHandler(i) 857 case *goAway: 858 return l.goAwayHandler(i) 859 case *outFlowControlSizeRequest: 860 l.outFlowControlSizeRequestHandler(i) 861 case closeConnection: 862 // Just return a non-I/O error and run() will flush and close the 863 // connection. 864 return ErrConnClosing 865 default: 866 return fmt.Errorf("transport: unknown control message type %T", i) 867 } 868 return nil 869 } 870 871 func (l *loopyWriter) applySettings(ss []http2.Setting) { 872 for _, s := range ss { 873 switch s.ID { 874 case http2.SettingInitialWindowSize: 875 o := l.oiws 876 l.oiws = s.Val 877 if o < l.oiws { 878 // If the new limit is greater make all depleted streams active. 879 for _, stream := range l.estdStreams { 880 if stream.state == waitingOnStreamQuota { 881 stream.state = active 882 l.activeStreams.enqueue(stream) 883 } 884 } 885 } 886 case http2.SettingHeaderTableSize: 887 updateHeaderTblSize(l.hEnc, s.Val) 888 } 889 } 890 } 891 892 // processData removes the first stream from active streams, writes out at most 16KB 893 // of its data and then puts it at the end of activeStreams if there's still more data 894 // to be sent and stream has some stream-level flow control. 895 func (l *loopyWriter) processData() (bool, error) { 896 if l.sendQuota == 0 { 897 return true, nil 898 } 899 str := l.activeStreams.dequeue() // Remove the first stream. 900 if str == nil { 901 return true, nil 902 } 903 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. 904 // A data item is represented by a dataFrame, since it later translates into 905 // multiple HTTP2 data frames. 906 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data. 907 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the 908 // maximum possible HTTP2 frame size. 909 910 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame 911 // Client sends out empty data frame with endStream = true 912 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { 913 return false, err 914 } 915 str.itl.dequeue() // remove the empty data item from stream 916 if str.itl.isEmpty() { 917 str.state = empty 918 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. 919 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 920 return false, err 921 } 922 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 923 return false, err 924 } 925 } else { 926 l.activeStreams.enqueue(str) 927 } 928 return false, nil 929 } 930 var ( 931 buf []byte 932 ) 933 // Figure out the maximum size we can send 934 maxSize := http2MaxFrameLen 935 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. 936 str.state = waitingOnStreamQuota 937 return false, nil 938 } else if maxSize > strQuota { 939 maxSize = strQuota 940 } 941 if maxSize > int(l.sendQuota) { // connection-level flow control. 942 maxSize = int(l.sendQuota) 943 } 944 // Compute how much of the header and data we can send within quota and max frame length 945 hSize := min(maxSize, len(dataItem.h)) 946 dSize := min(maxSize-hSize, len(dataItem.d)) 947 if hSize != 0 { 948 if dSize == 0 { 949 buf = dataItem.h 950 } else { 951 // We can add some data to grpc message header to distribute bytes more equally across frames. 952 // Copy on the stack to avoid generating garbage 953 var localBuf [http2MaxFrameLen]byte 954 copy(localBuf[:hSize], dataItem.h) 955 copy(localBuf[hSize:], dataItem.d[:dSize]) 956 buf = localBuf[:hSize+dSize] 957 } 958 } else { 959 buf = dataItem.d 960 } 961 962 size := hSize + dSize 963 964 // Now that outgoing flow controls are checked we can replenish str's write quota 965 str.wq.replenish(size) 966 var endStream bool 967 // If this is the last data message on this stream and all of it can be written in this iteration. 968 if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size { 969 endStream = true 970 } 971 if dataItem.onEachWrite != nil { 972 dataItem.onEachWrite() 973 } 974 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { 975 return false, err 976 } 977 str.bytesOutStanding += size 978 l.sendQuota -= uint32(size) 979 dataItem.h = dataItem.h[hSize:] 980 dataItem.d = dataItem.d[dSize:] 981 982 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. 983 str.itl.dequeue() 984 } 985 if str.itl.isEmpty() { 986 str.state = empty 987 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. 988 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 989 return false, err 990 } 991 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 992 return false, err 993 } 994 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. 995 str.state = waitingOnStreamQuota 996 } else { // Otherwise add it back to the list of active streams. 997 l.activeStreams.enqueue(str) 998 } 999 return false, nil 1000 } 1001 1002 func min(a, b int) int { 1003 if a < b { 1004 return a 1005 } 1006 return b 1007 }