writesched_roundrobin.go (2814B)
1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package http2 6 7 import ( 8 "fmt" 9 "math" 10 ) 11 12 type roundRobinWriteScheduler struct { 13 // control contains control frames (SETTINGS, PING, etc.). 14 control writeQueue 15 16 // streams maps stream ID to a queue. 17 streams map[uint32]*writeQueue 18 19 // stream queues are stored in a circular linked list. 20 // head is the next stream to write, or nil if there are no streams open. 21 head *writeQueue 22 23 // pool of empty queues for reuse. 24 queuePool writeQueuePool 25 } 26 27 // newRoundRobinWriteScheduler constructs a new write scheduler. 28 // The round robin scheduler priorizes control frames 29 // like SETTINGS and PING over DATA frames. 30 // When there are no control frames to send, it performs a round-robin 31 // selection from the ready streams. 32 func newRoundRobinWriteScheduler() WriteScheduler { 33 ws := &roundRobinWriteScheduler{ 34 streams: make(map[uint32]*writeQueue), 35 } 36 return ws 37 } 38 39 func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) { 40 if ws.streams[streamID] != nil { 41 panic(fmt.Errorf("stream %d already opened", streamID)) 42 } 43 q := ws.queuePool.get() 44 ws.streams[streamID] = q 45 if ws.head == nil { 46 ws.head = q 47 q.next = q 48 q.prev = q 49 } else { 50 // Queues are stored in a ring. 51 // Insert the new stream before ws.head, putting it at the end of the list. 52 q.prev = ws.head.prev 53 q.next = ws.head 54 q.prev.next = q 55 q.next.prev = q 56 } 57 } 58 59 func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) { 60 q := ws.streams[streamID] 61 if q == nil { 62 return 63 } 64 if q.next == q { 65 // This was the only open stream. 66 ws.head = nil 67 } else { 68 q.prev.next = q.next 69 q.next.prev = q.prev 70 if ws.head == q { 71 ws.head = q.next 72 } 73 } 74 delete(ws.streams, streamID) 75 ws.queuePool.put(q) 76 } 77 78 func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {} 79 80 func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) { 81 if wr.isControl() { 82 ws.control.push(wr) 83 return 84 } 85 q := ws.streams[wr.StreamID()] 86 if q == nil { 87 // This is a closed stream. 88 // wr should not be a HEADERS or DATA frame. 89 // We push the request onto the control queue. 90 if wr.DataSize() > 0 { 91 panic("add DATA on non-open stream") 92 } 93 ws.control.push(wr) 94 return 95 } 96 q.push(wr) 97 } 98 99 func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) { 100 // Control and RST_STREAM frames first. 101 if !ws.control.empty() { 102 return ws.control.shift(), true 103 } 104 if ws.head == nil { 105 return FrameWriteRequest{}, false 106 } 107 q := ws.head 108 for { 109 if wr, ok := q.consume(math.MaxInt32); ok { 110 ws.head = q.next 111 return wr, true 112 } 113 q = q.next 114 if q == ws.head { 115 break 116 } 117 } 118 return FrameWriteRequest{}, false 119 }