writesched_random.go (2051B)
1 // Copyright 2014 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package http2 6 7 import "math" 8 9 // NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2 10 // priorities. Control frames like SETTINGS and PING are written before DATA 11 // frames, but if no control frames are queued and multiple streams have queued 12 // HEADERS or DATA frames, Pop selects a ready stream arbitrarily. 13 func NewRandomWriteScheduler() WriteScheduler { 14 return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)} 15 } 16 17 type randomWriteScheduler struct { 18 // zero are frames not associated with a specific stream. 19 zero writeQueue 20 21 // sq contains the stream-specific queues, keyed by stream ID. 22 // When a stream is idle, closed, or emptied, it's deleted 23 // from the map. 24 sq map[uint32]*writeQueue 25 26 // pool of empty queues for reuse. 27 queuePool writeQueuePool 28 } 29 30 func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) { 31 // no-op: idle streams are not tracked 32 } 33 34 func (ws *randomWriteScheduler) CloseStream(streamID uint32) { 35 q, ok := ws.sq[streamID] 36 if !ok { 37 return 38 } 39 delete(ws.sq, streamID) 40 ws.queuePool.put(q) 41 } 42 43 func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) { 44 // no-op: priorities are ignored 45 } 46 47 func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) { 48 if wr.isControl() { 49 ws.zero.push(wr) 50 return 51 } 52 id := wr.StreamID() 53 q, ok := ws.sq[id] 54 if !ok { 55 q = ws.queuePool.get() 56 ws.sq[id] = q 57 } 58 q.push(wr) 59 } 60 61 func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) { 62 // Control and RST_STREAM frames first. 63 if !ws.zero.empty() { 64 return ws.zero.shift(), true 65 } 66 // Iterate over all non-idle streams until finding one that can be consumed. 67 for streamID, q := range ws.sq { 68 if wr, ok := q.consume(math.MaxInt32); ok { 69 if q.empty() { 70 delete(ws.sq, streamID) 71 ws.queuePool.put(q) 72 } 73 return wr, true 74 } 75 } 76 return FrameWriteRequest{}, false 77 }