gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

flowcontrol.go (5705B)


      1 /*
      2  *
      3  * Copyright 2014 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 package transport
     20 
     21 import (
     22 	"fmt"
     23 	"math"
     24 	"sync"
     25 	"sync/atomic"
     26 )
     27 
     28 // writeQuota is a soft limit on the amount of data a stream can
     29 // schedule before some of it is written out.
     30 type writeQuota struct {
     31 	quota int32
     32 	// get waits on read from when quota goes less than or equal to zero.
     33 	// replenish writes on it when quota goes positive again.
     34 	ch chan struct{}
     35 	// done is triggered in error case.
     36 	done <-chan struct{}
     37 	// replenish is called by loopyWriter to give quota back to.
     38 	// It is implemented as a field so that it can be updated
     39 	// by tests.
     40 	replenish func(n int)
     41 }
     42 
     43 func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
     44 	w := &writeQuota{
     45 		quota: sz,
     46 		ch:    make(chan struct{}, 1),
     47 		done:  done,
     48 	}
     49 	w.replenish = w.realReplenish
     50 	return w
     51 }
     52 
     53 func (w *writeQuota) get(sz int32) error {
     54 	for {
     55 		if atomic.LoadInt32(&w.quota) > 0 {
     56 			atomic.AddInt32(&w.quota, -sz)
     57 			return nil
     58 		}
     59 		select {
     60 		case <-w.ch:
     61 			continue
     62 		case <-w.done:
     63 			return errStreamDone
     64 		}
     65 	}
     66 }
     67 
     68 func (w *writeQuota) realReplenish(n int) {
     69 	sz := int32(n)
     70 	a := atomic.AddInt32(&w.quota, sz)
     71 	b := a - sz
     72 	if b <= 0 && a > 0 {
     73 		select {
     74 		case w.ch <- struct{}{}:
     75 		default:
     76 		}
     77 	}
     78 }
     79 
     80 type trInFlow struct {
     81 	limit               uint32
     82 	unacked             uint32
     83 	effectiveWindowSize uint32
     84 }
     85 
     86 func (f *trInFlow) newLimit(n uint32) uint32 {
     87 	d := n - f.limit
     88 	f.limit = n
     89 	f.updateEffectiveWindowSize()
     90 	return d
     91 }
     92 
     93 func (f *trInFlow) onData(n uint32) uint32 {
     94 	f.unacked += n
     95 	if f.unacked >= f.limit/4 {
     96 		w := f.unacked
     97 		f.unacked = 0
     98 		f.updateEffectiveWindowSize()
     99 		return w
    100 	}
    101 	f.updateEffectiveWindowSize()
    102 	return 0
    103 }
    104 
    105 func (f *trInFlow) reset() uint32 {
    106 	w := f.unacked
    107 	f.unacked = 0
    108 	f.updateEffectiveWindowSize()
    109 	return w
    110 }
    111 
    112 func (f *trInFlow) updateEffectiveWindowSize() {
    113 	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
    114 }
    115 
    116 func (f *trInFlow) getSize() uint32 {
    117 	return atomic.LoadUint32(&f.effectiveWindowSize)
    118 }
    119 
    120 // TODO(mmukhi): Simplify this code.
    121 // inFlow deals with inbound flow control
    122 type inFlow struct {
    123 	mu sync.Mutex
    124 	// The inbound flow control limit for pending data.
    125 	limit uint32
    126 	// pendingData is the overall data which have been received but not been
    127 	// consumed by applications.
    128 	pendingData uint32
    129 	// The amount of data the application has consumed but grpc has not sent
    130 	// window update for them. Used to reduce window update frequency.
    131 	pendingUpdate uint32
    132 	// delta is the extra window update given by receiver when an application
    133 	// is reading data bigger in size than the inFlow limit.
    134 	delta uint32
    135 }
    136 
    137 // newLimit updates the inflow window to a new value n.
    138 // It assumes that n is always greater than the old limit.
    139 func (f *inFlow) newLimit(n uint32) {
    140 	f.mu.Lock()
    141 	f.limit = n
    142 	f.mu.Unlock()
    143 }
    144 
    145 func (f *inFlow) maybeAdjust(n uint32) uint32 {
    146 	if n > uint32(math.MaxInt32) {
    147 		n = uint32(math.MaxInt32)
    148 	}
    149 	f.mu.Lock()
    150 	defer f.mu.Unlock()
    151 	// estSenderQuota is the receiver's view of the maximum number of bytes the sender
    152 	// can send without a window update.
    153 	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
    154 	// estUntransmittedData is the maximum number of bytes the sends might not have put
    155 	// on the wire yet. A value of 0 or less means that we have already received all or
    156 	// more bytes than the application is requesting to read.
    157 	estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
    158 	// This implies that unless we send a window update, the sender won't be able to send all the bytes
    159 	// for this message. Therefore we must send an update over the limit since there's an active read
    160 	// request from the application.
    161 	if estUntransmittedData > estSenderQuota {
    162 		// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
    163 		if f.limit+n > maxWindowSize {
    164 			f.delta = maxWindowSize - f.limit
    165 		} else {
    166 			// Send a window update for the whole message and not just the difference between
    167 			// estUntransmittedData and estSenderQuota. This will be helpful in case the message
    168 			// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
    169 			f.delta = n
    170 		}
    171 		return f.delta
    172 	}
    173 	return 0
    174 }
    175 
    176 // onData is invoked when some data frame is received. It updates pendingData.
    177 func (f *inFlow) onData(n uint32) error {
    178 	f.mu.Lock()
    179 	f.pendingData += n
    180 	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
    181 		limit := f.limit
    182 		rcvd := f.pendingData + f.pendingUpdate
    183 		f.mu.Unlock()
    184 		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
    185 	}
    186 	f.mu.Unlock()
    187 	return nil
    188 }
    189 
    190 // onRead is invoked when the application reads the data. It returns the window size
    191 // to be sent to the peer.
    192 func (f *inFlow) onRead(n uint32) uint32 {
    193 	f.mu.Lock()
    194 	if f.pendingData == 0 {
    195 		f.mu.Unlock()
    196 		return 0
    197 	}
    198 	f.pendingData -= n
    199 	if n > f.delta {
    200 		n -= f.delta
    201 		f.delta = 0
    202 	} else {
    203 		f.delta -= n
    204 		n = 0
    205 	}
    206 	f.pendingUpdate += n
    207 	if f.pendingUpdate >= f.limit/4 {
    208 		wu := f.pendingUpdate
    209 		f.pendingUpdate = 0
    210 		f.mu.Unlock()
    211 		return wu
    212 	}
    213 	f.mu.Unlock()
    214 	return 0
    215 }