flowcontrol.go (5705B)
1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package transport 20 21 import ( 22 "fmt" 23 "math" 24 "sync" 25 "sync/atomic" 26 ) 27 28 // writeQuota is a soft limit on the amount of data a stream can 29 // schedule before some of it is written out. 30 type writeQuota struct { 31 quota int32 32 // get waits on read from when quota goes less than or equal to zero. 33 // replenish writes on it when quota goes positive again. 34 ch chan struct{} 35 // done is triggered in error case. 36 done <-chan struct{} 37 // replenish is called by loopyWriter to give quota back to. 38 // It is implemented as a field so that it can be updated 39 // by tests. 40 replenish func(n int) 41 } 42 43 func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { 44 w := &writeQuota{ 45 quota: sz, 46 ch: make(chan struct{}, 1), 47 done: done, 48 } 49 w.replenish = w.realReplenish 50 return w 51 } 52 53 func (w *writeQuota) get(sz int32) error { 54 for { 55 if atomic.LoadInt32(&w.quota) > 0 { 56 atomic.AddInt32(&w.quota, -sz) 57 return nil 58 } 59 select { 60 case <-w.ch: 61 continue 62 case <-w.done: 63 return errStreamDone 64 } 65 } 66 } 67 68 func (w *writeQuota) realReplenish(n int) { 69 sz := int32(n) 70 a := atomic.AddInt32(&w.quota, sz) 71 b := a - sz 72 if b <= 0 && a > 0 { 73 select { 74 case w.ch <- struct{}{}: 75 default: 76 } 77 } 78 } 79 80 type trInFlow struct { 81 limit uint32 82 unacked uint32 83 effectiveWindowSize uint32 84 } 85 86 func (f *trInFlow) newLimit(n uint32) uint32 { 87 d := n - f.limit 88 f.limit = n 89 f.updateEffectiveWindowSize() 90 return d 91 } 92 93 func (f *trInFlow) onData(n uint32) uint32 { 94 f.unacked += n 95 if f.unacked >= f.limit/4 { 96 w := f.unacked 97 f.unacked = 0 98 f.updateEffectiveWindowSize() 99 return w 100 } 101 f.updateEffectiveWindowSize() 102 return 0 103 } 104 105 func (f *trInFlow) reset() uint32 { 106 w := f.unacked 107 f.unacked = 0 108 f.updateEffectiveWindowSize() 109 return w 110 } 111 112 func (f *trInFlow) updateEffectiveWindowSize() { 113 atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked) 114 } 115 116 func (f *trInFlow) getSize() uint32 { 117 return atomic.LoadUint32(&f.effectiveWindowSize) 118 } 119 120 // TODO(mmukhi): Simplify this code. 121 // inFlow deals with inbound flow control 122 type inFlow struct { 123 mu sync.Mutex 124 // The inbound flow control limit for pending data. 125 limit uint32 126 // pendingData is the overall data which have been received but not been 127 // consumed by applications. 128 pendingData uint32 129 // The amount of data the application has consumed but grpc has not sent 130 // window update for them. Used to reduce window update frequency. 131 pendingUpdate uint32 132 // delta is the extra window update given by receiver when an application 133 // is reading data bigger in size than the inFlow limit. 134 delta uint32 135 } 136 137 // newLimit updates the inflow window to a new value n. 138 // It assumes that n is always greater than the old limit. 139 func (f *inFlow) newLimit(n uint32) { 140 f.mu.Lock() 141 f.limit = n 142 f.mu.Unlock() 143 } 144 145 func (f *inFlow) maybeAdjust(n uint32) uint32 { 146 if n > uint32(math.MaxInt32) { 147 n = uint32(math.MaxInt32) 148 } 149 f.mu.Lock() 150 defer f.mu.Unlock() 151 // estSenderQuota is the receiver's view of the maximum number of bytes the sender 152 // can send without a window update. 153 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) 154 // estUntransmittedData is the maximum number of bytes the sends might not have put 155 // on the wire yet. A value of 0 or less means that we have already received all or 156 // more bytes than the application is requesting to read. 157 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. 158 // This implies that unless we send a window update, the sender won't be able to send all the bytes 159 // for this message. Therefore we must send an update over the limit since there's an active read 160 // request from the application. 161 if estUntransmittedData > estSenderQuota { 162 // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec. 163 if f.limit+n > maxWindowSize { 164 f.delta = maxWindowSize - f.limit 165 } else { 166 // Send a window update for the whole message and not just the difference between 167 // estUntransmittedData and estSenderQuota. This will be helpful in case the message 168 // is padded; We will fallback on the current available window(at least a 1/4th of the limit). 169 f.delta = n 170 } 171 return f.delta 172 } 173 return 0 174 } 175 176 // onData is invoked when some data frame is received. It updates pendingData. 177 func (f *inFlow) onData(n uint32) error { 178 f.mu.Lock() 179 f.pendingData += n 180 if f.pendingData+f.pendingUpdate > f.limit+f.delta { 181 limit := f.limit 182 rcvd := f.pendingData + f.pendingUpdate 183 f.mu.Unlock() 184 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) 185 } 186 f.mu.Unlock() 187 return nil 188 } 189 190 // onRead is invoked when the application reads the data. It returns the window size 191 // to be sent to the peer. 192 func (f *inFlow) onRead(n uint32) uint32 { 193 f.mu.Lock() 194 if f.pendingData == 0 { 195 f.mu.Unlock() 196 return 0 197 } 198 f.pendingData -= n 199 if n > f.delta { 200 n -= f.delta 201 f.delta = 0 202 } else { 203 f.delta -= n 204 n = 0 205 } 206 f.pendingUpdate += n 207 if f.pendingUpdate >= f.limit/4 { 208 wu := f.pendingUpdate 209 f.pendingUpdate = 0 210 f.mu.Unlock() 211 return wu 212 } 213 f.mu.Unlock() 214 return 0 215 }