gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

controlbuf.go (27344B)


      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package transport
     20 
     21 import (
     22 	"bytes"
     23 	"errors"
     24 	"fmt"
     25 	"net"
     26 	"runtime"
     27 	"strconv"
     28 	"sync"
     29 	"sync/atomic"
     30 
     31 	"golang.org/x/net/http2"
     32 	"golang.org/x/net/http2/hpack"
     33 	"google.golang.org/grpc/internal/grpclog"
     34 	"google.golang.org/grpc/internal/grpcutil"
     35 	"google.golang.org/grpc/status"
     36 )
     37 
     38 var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
     39 	e.SetMaxDynamicTableSizeLimit(v)
     40 }
     41 
     42 type itemNode struct {
     43 	it   interface{}
     44 	next *itemNode
     45 }
     46 
     47 type itemList struct {
     48 	head *itemNode
     49 	tail *itemNode
     50 }
     51 
     52 func (il *itemList) enqueue(i interface{}) {
     53 	n := &itemNode{it: i}
     54 	if il.tail == nil {
     55 		il.head, il.tail = n, n
     56 		return
     57 	}
     58 	il.tail.next = n
     59 	il.tail = n
     60 }
     61 
     62 // peek returns the first item in the list without removing it from the
     63 // list.
     64 func (il *itemList) peek() interface{} {
     65 	return il.head.it
     66 }
     67 
     68 func (il *itemList) dequeue() interface{} {
     69 	if il.head == nil {
     70 		return nil
     71 	}
     72 	i := il.head.it
     73 	il.head = il.head.next
     74 	if il.head == nil {
     75 		il.tail = nil
     76 	}
     77 	return i
     78 }
     79 
     80 func (il *itemList) dequeueAll() *itemNode {
     81 	h := il.head
     82 	il.head, il.tail = nil, nil
     83 	return h
     84 }
     85 
     86 func (il *itemList) isEmpty() bool {
     87 	return il.head == nil
     88 }
     89 
     90 // The following defines various control items which could flow through
     91 // the control buffer of transport. They represent different aspects of
     92 // control tasks, e.g., flow control, settings, streaming resetting, etc.
     93 
     94 // maxQueuedTransportResponseFrames is the most queued "transport response"
     95 // frames we will buffer before preventing new reads from occurring on the
     96 // transport.  These are control frames sent in response to client requests,
     97 // such as RST_STREAM due to bad headers or settings acks.
     98 const maxQueuedTransportResponseFrames = 50
     99 
    100 type cbItem interface {
    101 	isTransportResponseFrame() bool
    102 }
    103 
    104 // registerStream is used to register an incoming stream with loopy writer.
    105 type registerStream struct {
    106 	streamID uint32
    107 	wq       *writeQuota
    108 }
    109 
    110 func (*registerStream) isTransportResponseFrame() bool { return false }
    111 
    112 // headerFrame is also used to register stream on the client-side.
    113 type headerFrame struct {
    114 	streamID   uint32
    115 	hf         []hpack.HeaderField
    116 	endStream  bool               // Valid on server side.
    117 	initStream func(uint32) error // Used only on the client side.
    118 	onWrite    func()
    119 	wq         *writeQuota    // write quota for the stream created.
    120 	cleanup    *cleanupStream // Valid on the server side.
    121 	onOrphaned func(error)    // Valid on client-side
    122 }
    123 
    124 func (h *headerFrame) isTransportResponseFrame() bool {
    125 	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
    126 }
    127 
    128 type cleanupStream struct {
    129 	streamID uint32
    130 	rst      bool
    131 	rstCode  http2.ErrCode
    132 	onWrite  func()
    133 }
    134 
    135 func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
    136 
    137 type earlyAbortStream struct {
    138 	httpStatus     uint32
    139 	streamID       uint32
    140 	contentSubtype string
    141 	status         *status.Status
    142 	rst            bool
    143 }
    144 
    145 func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
    146 
    147 type dataFrame struct {
    148 	streamID  uint32
    149 	endStream bool
    150 	h         []byte
    151 	d         []byte
    152 	// onEachWrite is called every time
    153 	// a part of d is written out.
    154 	onEachWrite func()
    155 }
    156 
    157 func (*dataFrame) isTransportResponseFrame() bool { return false }
    158 
    159 type incomingWindowUpdate struct {
    160 	streamID  uint32
    161 	increment uint32
    162 }
    163 
    164 func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
    165 
    166 type outgoingWindowUpdate struct {
    167 	streamID  uint32
    168 	increment uint32
    169 }
    170 
    171 func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
    172 	return false // window updates are throttled by thresholds
    173 }
    174 
    175 type incomingSettings struct {
    176 	ss []http2.Setting
    177 }
    178 
    179 func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
    180 
    181 type outgoingSettings struct {
    182 	ss []http2.Setting
    183 }
    184 
    185 func (*outgoingSettings) isTransportResponseFrame() bool { return false }
    186 
    187 type incomingGoAway struct {
    188 }
    189 
    190 func (*incomingGoAway) isTransportResponseFrame() bool { return false }
    191 
    192 type goAway struct {
    193 	code      http2.ErrCode
    194 	debugData []byte
    195 	headsUp   bool
    196 	closeConn error // if set, loopyWriter will exit, resulting in conn closure
    197 }
    198 
    199 func (*goAway) isTransportResponseFrame() bool { return false }
    200 
    201 type ping struct {
    202 	ack  bool
    203 	data [8]byte
    204 }
    205 
    206 func (*ping) isTransportResponseFrame() bool { return true }
    207 
    208 type outFlowControlSizeRequest struct {
    209 	resp chan uint32
    210 }
    211 
    212 func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
    213 
    214 // closeConnection is an instruction to tell the loopy writer to flush the
    215 // framer and exit, which will cause the transport's connection to be closed
    216 // (by the client or server).  The transport itself will close after the reader
    217 // encounters the EOF caused by the connection closure.
    218 type closeConnection struct{}
    219 
    220 func (closeConnection) isTransportResponseFrame() bool { return false }
    221 
    222 type outStreamState int
    223 
    224 const (
    225 	active outStreamState = iota
    226 	empty
    227 	waitingOnStreamQuota
    228 )
    229 
    230 type outStream struct {
    231 	id               uint32
    232 	state            outStreamState
    233 	itl              *itemList
    234 	bytesOutStanding int
    235 	wq               *writeQuota
    236 
    237 	next *outStream
    238 	prev *outStream
    239 }
    240 
    241 func (s *outStream) deleteSelf() {
    242 	if s.prev != nil {
    243 		s.prev.next = s.next
    244 	}
    245 	if s.next != nil {
    246 		s.next.prev = s.prev
    247 	}
    248 	s.next, s.prev = nil, nil
    249 }
    250 
    251 type outStreamList struct {
    252 	// Following are sentinel objects that mark the
    253 	// beginning and end of the list. They do not
    254 	// contain any item lists. All valid objects are
    255 	// inserted in between them.
    256 	// This is needed so that an outStream object can
    257 	// deleteSelf() in O(1) time without knowing which
    258 	// list it belongs to.
    259 	head *outStream
    260 	tail *outStream
    261 }
    262 
    263 func newOutStreamList() *outStreamList {
    264 	head, tail := new(outStream), new(outStream)
    265 	head.next = tail
    266 	tail.prev = head
    267 	return &outStreamList{
    268 		head: head,
    269 		tail: tail,
    270 	}
    271 }
    272 
    273 func (l *outStreamList) enqueue(s *outStream) {
    274 	e := l.tail.prev
    275 	e.next = s
    276 	s.prev = e
    277 	s.next = l.tail
    278 	l.tail.prev = s
    279 }
    280 
    281 // remove from the beginning of the list.
    282 func (l *outStreamList) dequeue() *outStream {
    283 	b := l.head.next
    284 	if b == l.tail {
    285 		return nil
    286 	}
    287 	b.deleteSelf()
    288 	return b
    289 }
    290 
    291 // controlBuffer is a way to pass information to loopy.
    292 // Information is passed as specific struct types called control frames.
    293 // A control frame not only represents data, messages or headers to be sent out
    294 // but can also be used to instruct loopy to update its internal state.
    295 // It shouldn't be confused with an HTTP2 frame, although some of the control frames
    296 // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
    297 type controlBuffer struct {
    298 	ch              chan struct{}
    299 	done            <-chan struct{}
    300 	mu              sync.Mutex
    301 	consumerWaiting bool
    302 	list            *itemList
    303 	err             error
    304 
    305 	// transportResponseFrames counts the number of queued items that represent
    306 	// the response of an action initiated by the peer.  trfChan is created
    307 	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
    308 	// closed and nilled when transportResponseFrames drops below the
    309 	// threshold.  Both fields are protected by mu.
    310 	transportResponseFrames int
    311 	trfChan                 atomic.Value // chan struct{}
    312 }
    313 
    314 func newControlBuffer(done <-chan struct{}) *controlBuffer {
    315 	return &controlBuffer{
    316 		ch:   make(chan struct{}, 1),
    317 		list: &itemList{},
    318 		done: done,
    319 	}
    320 }
    321 
    322 // throttle blocks if there are too many incomingSettings/cleanupStreams in the
    323 // controlbuf.
    324 func (c *controlBuffer) throttle() {
    325 	ch, _ := c.trfChan.Load().(chan struct{})
    326 	if ch != nil {
    327 		select {
    328 		case <-ch:
    329 		case <-c.done:
    330 		}
    331 	}
    332 }
    333 
    334 func (c *controlBuffer) put(it cbItem) error {
    335 	_, err := c.executeAndPut(nil, it)
    336 	return err
    337 }
    338 
    339 func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
    340 	var wakeUp bool
    341 	c.mu.Lock()
    342 	if c.err != nil {
    343 		c.mu.Unlock()
    344 		return false, c.err
    345 	}
    346 	if f != nil {
    347 		if !f(it) { // f wasn't successful
    348 			c.mu.Unlock()
    349 			return false, nil
    350 		}
    351 	}
    352 	if c.consumerWaiting {
    353 		wakeUp = true
    354 		c.consumerWaiting = false
    355 	}
    356 	c.list.enqueue(it)
    357 	if it.isTransportResponseFrame() {
    358 		c.transportResponseFrames++
    359 		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
    360 			// We are adding the frame that puts us over the threshold; create
    361 			// a throttling channel.
    362 			c.trfChan.Store(make(chan struct{}))
    363 		}
    364 	}
    365 	c.mu.Unlock()
    366 	if wakeUp {
    367 		select {
    368 		case c.ch <- struct{}{}:
    369 		default:
    370 		}
    371 	}
    372 	return true, nil
    373 }
    374 
    375 // Note argument f should never be nil.
    376 func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
    377 	c.mu.Lock()
    378 	if c.err != nil {
    379 		c.mu.Unlock()
    380 		return false, c.err
    381 	}
    382 	if !f(it) { // f wasn't successful
    383 		c.mu.Unlock()
    384 		return false, nil
    385 	}
    386 	c.mu.Unlock()
    387 	return true, nil
    388 }
    389 
    390 func (c *controlBuffer) get(block bool) (interface{}, error) {
    391 	for {
    392 		c.mu.Lock()
    393 		if c.err != nil {
    394 			c.mu.Unlock()
    395 			return nil, c.err
    396 		}
    397 		if !c.list.isEmpty() {
    398 			h := c.list.dequeue().(cbItem)
    399 			if h.isTransportResponseFrame() {
    400 				if c.transportResponseFrames == maxQueuedTransportResponseFrames {
    401 					// We are removing the frame that put us over the
    402 					// threshold; close and clear the throttling channel.
    403 					ch := c.trfChan.Load().(chan struct{})
    404 					close(ch)
    405 					c.trfChan.Store((chan struct{})(nil))
    406 				}
    407 				c.transportResponseFrames--
    408 			}
    409 			c.mu.Unlock()
    410 			return h, nil
    411 		}
    412 		if !block {
    413 			c.mu.Unlock()
    414 			return nil, nil
    415 		}
    416 		c.consumerWaiting = true
    417 		c.mu.Unlock()
    418 		select {
    419 		case <-c.ch:
    420 		case <-c.done:
    421 			return nil, errors.New("transport closed by client")
    422 		}
    423 	}
    424 }
    425 
    426 func (c *controlBuffer) finish() {
    427 	c.mu.Lock()
    428 	if c.err != nil {
    429 		c.mu.Unlock()
    430 		return
    431 	}
    432 	c.err = ErrConnClosing
    433 	// There may be headers for streams in the control buffer.
    434 	// These streams need to be cleaned out since the transport
    435 	// is still not aware of these yet.
    436 	for head := c.list.dequeueAll(); head != nil; head = head.next {
    437 		hdr, ok := head.it.(*headerFrame)
    438 		if !ok {
    439 			continue
    440 		}
    441 		if hdr.onOrphaned != nil { // It will be nil on the server-side.
    442 			hdr.onOrphaned(ErrConnClosing)
    443 		}
    444 	}
    445 	// In case throttle() is currently in flight, it needs to be unblocked.
    446 	// Otherwise, the transport may not close, since the transport is closed by
    447 	// the reader encountering the connection error.
    448 	ch, _ := c.trfChan.Load().(chan struct{})
    449 	if ch != nil {
    450 		close(ch)
    451 	}
    452 	c.trfChan.Store((chan struct{})(nil))
    453 	c.mu.Unlock()
    454 }
    455 
    456 type side int
    457 
    458 const (
    459 	clientSide side = iota
    460 	serverSide
    461 )
    462 
    463 // Loopy receives frames from the control buffer.
    464 // Each frame is handled individually; most of the work done by loopy goes
    465 // into handling data frames. Loopy maintains a queue of active streams, and each
    466 // stream maintains a queue of data frames; as loopy receives data frames
    467 // it gets added to the queue of the relevant stream.
    468 // Loopy goes over this list of active streams by processing one node every iteration,
    469 // thereby closely resemebling to a round-robin scheduling over all streams. While
    470 // processing a stream, loopy writes out data bytes from this stream capped by the min
    471 // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
    472 type loopyWriter struct {
    473 	side      side
    474 	cbuf      *controlBuffer
    475 	sendQuota uint32
    476 	oiws      uint32 // outbound initial window size.
    477 	// estdStreams is map of all established streams that are not cleaned-up yet.
    478 	// On client-side, this is all streams whose headers were sent out.
    479 	// On server-side, this is all streams whose headers were received.
    480 	estdStreams map[uint32]*outStream // Established streams.
    481 	// activeStreams is a linked-list of all streams that have data to send and some
    482 	// stream-level flow control quota.
    483 	// Each of these streams internally have a list of data items(and perhaps trailers
    484 	// on the server-side) to be sent out.
    485 	activeStreams *outStreamList
    486 	framer        *framer
    487 	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
    488 	hEnc          *hpack.Encoder // HPACK encoder.
    489 	bdpEst        *bdpEstimator
    490 	draining      bool
    491 	conn          net.Conn
    492 	logger        *grpclog.PrefixLogger
    493 
    494 	// Side-specific handlers
    495 	ssGoAwayHandler func(*goAway) (bool, error)
    496 }
    497 
    498 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
    499 	var buf bytes.Buffer
    500 	l := &loopyWriter{
    501 		side:          s,
    502 		cbuf:          cbuf,
    503 		sendQuota:     defaultWindowSize,
    504 		oiws:          defaultWindowSize,
    505 		estdStreams:   make(map[uint32]*outStream),
    506 		activeStreams: newOutStreamList(),
    507 		framer:        fr,
    508 		hBuf:          &buf,
    509 		hEnc:          hpack.NewEncoder(&buf),
    510 		bdpEst:        bdpEst,
    511 		conn:          conn,
    512 		logger:        logger,
    513 	}
    514 	return l
    515 }
    516 
    517 const minBatchSize = 1000
    518 
    519 // run should be run in a separate goroutine.
    520 // It reads control frames from controlBuf and processes them by:
    521 // 1. Updating loopy's internal state, or/and
    522 // 2. Writing out HTTP2 frames on the wire.
    523 //
    524 // Loopy keeps all active streams with data to send in a linked-list.
    525 // All streams in the activeStreams linked-list must have both:
    526 // 1. Data to send, and
    527 // 2. Stream level flow control quota available.
    528 //
    529 // In each iteration of run loop, other than processing the incoming control
    530 // frame, loopy calls processData, which processes one node from the
    531 // activeStreams linked-list.  This results in writing of HTTP2 frames into an
    532 // underlying write buffer.  When there's no more control frames to read from
    533 // controlBuf, loopy flushes the write buffer.  As an optimization, to increase
    534 // the batch size for each flush, loopy yields the processor, once if the batch
    535 // size is too low to give stream goroutines a chance to fill it up.
    536 //
    537 // Upon exiting, if the error causing the exit is not an I/O error, run()
    538 // flushes and closes the underlying connection.  Otherwise, the connection is
    539 // left open to allow the I/O error to be encountered by the reader instead.
    540 func (l *loopyWriter) run() (err error) {
    541 	defer func() {
    542 		if l.logger.V(logLevel) {
    543 			l.logger.Infof("loopyWriter exiting with error: %v", err)
    544 		}
    545 		if !isIOError(err) {
    546 			l.framer.writer.Flush()
    547 			l.conn.Close()
    548 		}
    549 		l.cbuf.finish()
    550 	}()
    551 	for {
    552 		it, err := l.cbuf.get(true)
    553 		if err != nil {
    554 			return err
    555 		}
    556 		if err = l.handle(it); err != nil {
    557 			return err
    558 		}
    559 		if _, err = l.processData(); err != nil {
    560 			return err
    561 		}
    562 		gosched := true
    563 	hasdata:
    564 		for {
    565 			it, err := l.cbuf.get(false)
    566 			if err != nil {
    567 				return err
    568 			}
    569 			if it != nil {
    570 				if err = l.handle(it); err != nil {
    571 					return err
    572 				}
    573 				if _, err = l.processData(); err != nil {
    574 					return err
    575 				}
    576 				continue hasdata
    577 			}
    578 			isEmpty, err := l.processData()
    579 			if err != nil {
    580 				return err
    581 			}
    582 			if !isEmpty {
    583 				continue hasdata
    584 			}
    585 			if gosched {
    586 				gosched = false
    587 				if l.framer.writer.offset < minBatchSize {
    588 					runtime.Gosched()
    589 					continue hasdata
    590 				}
    591 			}
    592 			l.framer.writer.Flush()
    593 			break hasdata
    594 		}
    595 	}
    596 }
    597 
    598 func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
    599 	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
    600 }
    601 
    602 func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
    603 	// Otherwise update the quota.
    604 	if w.streamID == 0 {
    605 		l.sendQuota += w.increment
    606 		return
    607 	}
    608 	// Find the stream and update it.
    609 	if str, ok := l.estdStreams[w.streamID]; ok {
    610 		str.bytesOutStanding -= int(w.increment)
    611 		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
    612 			str.state = active
    613 			l.activeStreams.enqueue(str)
    614 			return
    615 		}
    616 	}
    617 }
    618 
    619 func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
    620 	return l.framer.fr.WriteSettings(s.ss...)
    621 }
    622 
    623 func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
    624 	l.applySettings(s.ss)
    625 	return l.framer.fr.WriteSettingsAck()
    626 }
    627 
    628 func (l *loopyWriter) registerStreamHandler(h *registerStream) {
    629 	str := &outStream{
    630 		id:    h.streamID,
    631 		state: empty,
    632 		itl:   &itemList{},
    633 		wq:    h.wq,
    634 	}
    635 	l.estdStreams[h.streamID] = str
    636 }
    637 
    638 func (l *loopyWriter) headerHandler(h *headerFrame) error {
    639 	if l.side == serverSide {
    640 		str, ok := l.estdStreams[h.streamID]
    641 		if !ok {
    642 			if l.logger.V(logLevel) {
    643 				l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
    644 			}
    645 			return nil
    646 		}
    647 		// Case 1.A: Server is responding back with headers.
    648 		if !h.endStream {
    649 			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
    650 		}
    651 		// else:  Case 1.B: Server wants to close stream.
    652 
    653 		if str.state != empty { // either active or waiting on stream quota.
    654 			// add it str's list of items.
    655 			str.itl.enqueue(h)
    656 			return nil
    657 		}
    658 		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
    659 			return err
    660 		}
    661 		return l.cleanupStreamHandler(h.cleanup)
    662 	}
    663 	// Case 2: Client wants to originate stream.
    664 	str := &outStream{
    665 		id:    h.streamID,
    666 		state: empty,
    667 		itl:   &itemList{},
    668 		wq:    h.wq,
    669 	}
    670 	return l.originateStream(str, h)
    671 }
    672 
    673 func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
    674 	// l.draining is set when handling GoAway. In which case, we want to avoid
    675 	// creating new streams.
    676 	if l.draining {
    677 		// TODO: provide a better error with the reason we are in draining.
    678 		hdr.onOrphaned(errStreamDrain)
    679 		return nil
    680 	}
    681 	if err := hdr.initStream(str.id); err != nil {
    682 		return err
    683 	}
    684 	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
    685 		return err
    686 	}
    687 	l.estdStreams[str.id] = str
    688 	return nil
    689 }
    690 
    691 func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
    692 	if onWrite != nil {
    693 		onWrite()
    694 	}
    695 	l.hBuf.Reset()
    696 	for _, f := range hf {
    697 		if err := l.hEnc.WriteField(f); err != nil {
    698 			if l.logger.V(logLevel) {
    699 				l.logger.Warningf("Encountered error while encoding headers: %v", err)
    700 			}
    701 		}
    702 	}
    703 	var (
    704 		err               error
    705 		endHeaders, first bool
    706 	)
    707 	first = true
    708 	for !endHeaders {
    709 		size := l.hBuf.Len()
    710 		if size > http2MaxFrameLen {
    711 			size = http2MaxFrameLen
    712 		} else {
    713 			endHeaders = true
    714 		}
    715 		if first {
    716 			first = false
    717 			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
    718 				StreamID:      streamID,
    719 				BlockFragment: l.hBuf.Next(size),
    720 				EndStream:     endStream,
    721 				EndHeaders:    endHeaders,
    722 			})
    723 		} else {
    724 			err = l.framer.fr.WriteContinuation(
    725 				streamID,
    726 				endHeaders,
    727 				l.hBuf.Next(size),
    728 			)
    729 		}
    730 		if err != nil {
    731 			return err
    732 		}
    733 	}
    734 	return nil
    735 }
    736 
    737 func (l *loopyWriter) preprocessData(df *dataFrame) {
    738 	str, ok := l.estdStreams[df.streamID]
    739 	if !ok {
    740 		return
    741 	}
    742 	// If we got data for a stream it means that
    743 	// stream was originated and the headers were sent out.
    744 	str.itl.enqueue(df)
    745 	if str.state == empty {
    746 		str.state = active
    747 		l.activeStreams.enqueue(str)
    748 	}
    749 }
    750 
    751 func (l *loopyWriter) pingHandler(p *ping) error {
    752 	if !p.ack {
    753 		l.bdpEst.timesnap(p.data)
    754 	}
    755 	return l.framer.fr.WritePing(p.ack, p.data)
    756 
    757 }
    758 
    759 func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
    760 	o.resp <- l.sendQuota
    761 }
    762 
    763 func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
    764 	c.onWrite()
    765 	if str, ok := l.estdStreams[c.streamID]; ok {
    766 		// On the server side it could be a trailers-only response or
    767 		// a RST_STREAM before stream initialization thus the stream might
    768 		// not be established yet.
    769 		delete(l.estdStreams, c.streamID)
    770 		str.deleteSelf()
    771 	}
    772 	if c.rst { // If RST_STREAM needs to be sent.
    773 		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
    774 			return err
    775 		}
    776 	}
    777 	if l.draining && len(l.estdStreams) == 0 {
    778 		// Flush and close the connection; we are done with it.
    779 		return errors.New("finished processing active streams while in draining mode")
    780 	}
    781 	return nil
    782 }
    783 
    784 func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
    785 	if l.side == clientSide {
    786 		return errors.New("earlyAbortStream not handled on client")
    787 	}
    788 	// In case the caller forgets to set the http status, default to 200.
    789 	if eas.httpStatus == 0 {
    790 		eas.httpStatus = 200
    791 	}
    792 	headerFields := []hpack.HeaderField{
    793 		{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
    794 		{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
    795 		{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
    796 		{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
    797 	}
    798 
    799 	if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
    800 		return err
    801 	}
    802 	if eas.rst {
    803 		if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
    804 			return err
    805 		}
    806 	}
    807 	return nil
    808 }
    809 
    810 func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
    811 	if l.side == clientSide {
    812 		l.draining = true
    813 		if len(l.estdStreams) == 0 {
    814 			// Flush and close the connection; we are done with it.
    815 			return errors.New("received GOAWAY with no active streams")
    816 		}
    817 	}
    818 	return nil
    819 }
    820 
    821 func (l *loopyWriter) goAwayHandler(g *goAway) error {
    822 	// Handling of outgoing GoAway is very specific to side.
    823 	if l.ssGoAwayHandler != nil {
    824 		draining, err := l.ssGoAwayHandler(g)
    825 		if err != nil {
    826 			return err
    827 		}
    828 		l.draining = draining
    829 	}
    830 	return nil
    831 }
    832 
    833 func (l *loopyWriter) handle(i interface{}) error {
    834 	switch i := i.(type) {
    835 	case *incomingWindowUpdate:
    836 		l.incomingWindowUpdateHandler(i)
    837 	case *outgoingWindowUpdate:
    838 		return l.outgoingWindowUpdateHandler(i)
    839 	case *incomingSettings:
    840 		return l.incomingSettingsHandler(i)
    841 	case *outgoingSettings:
    842 		return l.outgoingSettingsHandler(i)
    843 	case *headerFrame:
    844 		return l.headerHandler(i)
    845 	case *registerStream:
    846 		l.registerStreamHandler(i)
    847 	case *cleanupStream:
    848 		return l.cleanupStreamHandler(i)
    849 	case *earlyAbortStream:
    850 		return l.earlyAbortStreamHandler(i)
    851 	case *incomingGoAway:
    852 		return l.incomingGoAwayHandler(i)
    853 	case *dataFrame:
    854 		l.preprocessData(i)
    855 	case *ping:
    856 		return l.pingHandler(i)
    857 	case *goAway:
    858 		return l.goAwayHandler(i)
    859 	case *outFlowControlSizeRequest:
    860 		l.outFlowControlSizeRequestHandler(i)
    861 	case closeConnection:
    862 		// Just return a non-I/O error and run() will flush and close the
    863 		// connection.
    864 		return ErrConnClosing
    865 	default:
    866 		return fmt.Errorf("transport: unknown control message type %T", i)
    867 	}
    868 	return nil
    869 }
    870 
    871 func (l *loopyWriter) applySettings(ss []http2.Setting) {
    872 	for _, s := range ss {
    873 		switch s.ID {
    874 		case http2.SettingInitialWindowSize:
    875 			o := l.oiws
    876 			l.oiws = s.Val
    877 			if o < l.oiws {
    878 				// If the new limit is greater make all depleted streams active.
    879 				for _, stream := range l.estdStreams {
    880 					if stream.state == waitingOnStreamQuota {
    881 						stream.state = active
    882 						l.activeStreams.enqueue(stream)
    883 					}
    884 				}
    885 			}
    886 		case http2.SettingHeaderTableSize:
    887 			updateHeaderTblSize(l.hEnc, s.Val)
    888 		}
    889 	}
    890 }
    891 
    892 // processData removes the first stream from active streams, writes out at most 16KB
    893 // of its data and then puts it at the end of activeStreams if there's still more data
    894 // to be sent and stream has some stream-level flow control.
    895 func (l *loopyWriter) processData() (bool, error) {
    896 	if l.sendQuota == 0 {
    897 		return true, nil
    898 	}
    899 	str := l.activeStreams.dequeue() // Remove the first stream.
    900 	if str == nil {
    901 		return true, nil
    902 	}
    903 	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
    904 	// A data item is represented by a dataFrame, since it later translates into
    905 	// multiple HTTP2 data frames.
    906 	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
    907 	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
    908 	// maximum possible HTTP2 frame size.
    909 
    910 	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
    911 		// Client sends out empty data frame with endStream = true
    912 		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
    913 			return false, err
    914 		}
    915 		str.itl.dequeue() // remove the empty data item from stream
    916 		if str.itl.isEmpty() {
    917 			str.state = empty
    918 		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
    919 			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
    920 				return false, err
    921 			}
    922 			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
    923 				return false, err
    924 			}
    925 		} else {
    926 			l.activeStreams.enqueue(str)
    927 		}
    928 		return false, nil
    929 	}
    930 	var (
    931 		buf []byte
    932 	)
    933 	// Figure out the maximum size we can send
    934 	maxSize := http2MaxFrameLen
    935 	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
    936 		str.state = waitingOnStreamQuota
    937 		return false, nil
    938 	} else if maxSize > strQuota {
    939 		maxSize = strQuota
    940 	}
    941 	if maxSize > int(l.sendQuota) { // connection-level flow control.
    942 		maxSize = int(l.sendQuota)
    943 	}
    944 	// Compute how much of the header and data we can send within quota and max frame length
    945 	hSize := min(maxSize, len(dataItem.h))
    946 	dSize := min(maxSize-hSize, len(dataItem.d))
    947 	if hSize != 0 {
    948 		if dSize == 0 {
    949 			buf = dataItem.h
    950 		} else {
    951 			// We can add some data to grpc message header to distribute bytes more equally across frames.
    952 			// Copy on the stack to avoid generating garbage
    953 			var localBuf [http2MaxFrameLen]byte
    954 			copy(localBuf[:hSize], dataItem.h)
    955 			copy(localBuf[hSize:], dataItem.d[:dSize])
    956 			buf = localBuf[:hSize+dSize]
    957 		}
    958 	} else {
    959 		buf = dataItem.d
    960 	}
    961 
    962 	size := hSize + dSize
    963 
    964 	// Now that outgoing flow controls are checked we can replenish str's write quota
    965 	str.wq.replenish(size)
    966 	var endStream bool
    967 	// If this is the last data message on this stream and all of it can be written in this iteration.
    968 	if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
    969 		endStream = true
    970 	}
    971 	if dataItem.onEachWrite != nil {
    972 		dataItem.onEachWrite()
    973 	}
    974 	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
    975 		return false, err
    976 	}
    977 	str.bytesOutStanding += size
    978 	l.sendQuota -= uint32(size)
    979 	dataItem.h = dataItem.h[hSize:]
    980 	dataItem.d = dataItem.d[dSize:]
    981 
    982 	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
    983 		str.itl.dequeue()
    984 	}
    985 	if str.itl.isEmpty() {
    986 		str.state = empty
    987 	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
    988 		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
    989 			return false, err
    990 		}
    991 		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
    992 			return false, err
    993 		}
    994 	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
    995 		str.state = waitingOnStreamQuota
    996 	} else { // Otherwise add it back to the list of active streams.
    997 		l.activeStreams.enqueue(str)
    998 	}
    999 	return false, nil
   1000 }
   1001 
   1002 func min(a, b int) int {
   1003 	if a < b {
   1004 		return a
   1005 	}
   1006 	return b
   1007 }