gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

writesched_roundrobin.go (2814B)


      1 // Copyright 2023 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package http2
      6 
      7 import (
      8 	"fmt"
      9 	"math"
     10 )
     11 
     12 type roundRobinWriteScheduler struct {
     13 	// control contains control frames (SETTINGS, PING, etc.).
     14 	control writeQueue
     15 
     16 	// streams maps stream ID to a queue.
     17 	streams map[uint32]*writeQueue
     18 
     19 	// stream queues are stored in a circular linked list.
     20 	// head is the next stream to write, or nil if there are no streams open.
     21 	head *writeQueue
     22 
     23 	// pool of empty queues for reuse.
     24 	queuePool writeQueuePool
     25 }
     26 
     27 // newRoundRobinWriteScheduler constructs a new write scheduler.
     28 // The round robin scheduler priorizes control frames
     29 // like SETTINGS and PING over DATA frames.
     30 // When there are no control frames to send, it performs a round-robin
     31 // selection from the ready streams.
     32 func newRoundRobinWriteScheduler() WriteScheduler {
     33 	ws := &roundRobinWriteScheduler{
     34 		streams: make(map[uint32]*writeQueue),
     35 	}
     36 	return ws
     37 }
     38 
     39 func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
     40 	if ws.streams[streamID] != nil {
     41 		panic(fmt.Errorf("stream %d already opened", streamID))
     42 	}
     43 	q := ws.queuePool.get()
     44 	ws.streams[streamID] = q
     45 	if ws.head == nil {
     46 		ws.head = q
     47 		q.next = q
     48 		q.prev = q
     49 	} else {
     50 		// Queues are stored in a ring.
     51 		// Insert the new stream before ws.head, putting it at the end of the list.
     52 		q.prev = ws.head.prev
     53 		q.next = ws.head
     54 		q.prev.next = q
     55 		q.next.prev = q
     56 	}
     57 }
     58 
     59 func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
     60 	q := ws.streams[streamID]
     61 	if q == nil {
     62 		return
     63 	}
     64 	if q.next == q {
     65 		// This was the only open stream.
     66 		ws.head = nil
     67 	} else {
     68 		q.prev.next = q.next
     69 		q.next.prev = q.prev
     70 		if ws.head == q {
     71 			ws.head = q.next
     72 		}
     73 	}
     74 	delete(ws.streams, streamID)
     75 	ws.queuePool.put(q)
     76 }
     77 
     78 func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
     79 
     80 func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
     81 	if wr.isControl() {
     82 		ws.control.push(wr)
     83 		return
     84 	}
     85 	q := ws.streams[wr.StreamID()]
     86 	if q == nil {
     87 		// This is a closed stream.
     88 		// wr should not be a HEADERS or DATA frame.
     89 		// We push the request onto the control queue.
     90 		if wr.DataSize() > 0 {
     91 			panic("add DATA on non-open stream")
     92 		}
     93 		ws.control.push(wr)
     94 		return
     95 	}
     96 	q.push(wr)
     97 }
     98 
     99 func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
    100 	// Control and RST_STREAM frames first.
    101 	if !ws.control.empty() {
    102 		return ws.control.shift(), true
    103 	}
    104 	if ws.head == nil {
    105 		return FrameWriteRequest{}, false
    106 	}
    107 	q := ws.head
    108 	for {
    109 		if wr, ok := q.consume(math.MaxInt32); ok {
    110 			ws.head = q.next
    111 			return wr, true
    112 		}
    113 		q = q.next
    114 		if q == ws.head {
    115 			break
    116 		}
    117 	}
    118 	return FrameWriteRequest{}, false
    119 }