sequential_handler.go (2667B)
1 package dbus 2 3 import ( 4 "sync" 5 ) 6 7 // NewSequentialSignalHandler returns an instance of a new 8 // signal handler that guarantees sequential processing of signals. It is a 9 // guarantee of this signal handler that signals will be written to 10 // channels in the order they are received on the DBus connection. 11 func NewSequentialSignalHandler() SignalHandler { 12 return &sequentialSignalHandler{} 13 } 14 15 type sequentialSignalHandler struct { 16 mu sync.RWMutex 17 closed bool 18 signals []*sequentialSignalChannelData 19 } 20 21 func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) { 22 sh.mu.RLock() 23 defer sh.mu.RUnlock() 24 if sh.closed { 25 return 26 } 27 for _, scd := range sh.signals { 28 scd.deliver(signal) 29 } 30 } 31 32 func (sh *sequentialSignalHandler) Terminate() { 33 sh.mu.Lock() 34 defer sh.mu.Unlock() 35 if sh.closed { 36 return 37 } 38 39 for _, scd := range sh.signals { 40 scd.close() 41 close(scd.ch) 42 } 43 sh.closed = true 44 sh.signals = nil 45 } 46 47 func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) { 48 sh.mu.Lock() 49 defer sh.mu.Unlock() 50 if sh.closed { 51 return 52 } 53 sh.signals = append(sh.signals, newSequentialSignalChannelData(ch)) 54 } 55 56 func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) { 57 sh.mu.Lock() 58 defer sh.mu.Unlock() 59 if sh.closed { 60 return 61 } 62 for i := len(sh.signals) - 1; i >= 0; i-- { 63 if ch == sh.signals[i].ch { 64 sh.signals[i].close() 65 copy(sh.signals[i:], sh.signals[i+1:]) 66 sh.signals[len(sh.signals)-1] = nil 67 sh.signals = sh.signals[:len(sh.signals)-1] 68 } 69 } 70 } 71 72 type sequentialSignalChannelData struct { 73 ch chan<- *Signal 74 in chan *Signal 75 done chan struct{} 76 } 77 78 func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData { 79 scd := &sequentialSignalChannelData{ 80 ch: ch, 81 in: make(chan *Signal), 82 done: make(chan struct{}), 83 } 84 go scd.bufferSignals() 85 return scd 86 } 87 88 func (scd *sequentialSignalChannelData) bufferSignals() { 89 defer close(scd.done) 90 91 // Ensure that signals are delivered to scd.ch in the same 92 // order they are received from scd.in. 93 var queue []*Signal 94 for { 95 if len(queue) == 0 { 96 signal, ok := <- scd.in 97 if !ok { 98 return 99 } 100 queue = append(queue, signal) 101 } 102 select { 103 case scd.ch <- queue[0]: 104 copy(queue, queue[1:]) 105 queue[len(queue)-1] = nil 106 queue = queue[:len(queue)-1] 107 case signal, ok := <-scd.in: 108 if !ok { 109 return 110 } 111 queue = append(queue, signal) 112 } 113 } 114 } 115 116 func (scd *sequentialSignalChannelData) deliver(signal *Signal) { 117 scd.in <- signal 118 } 119 120 func (scd *sequentialSignalChannelData) close() { 121 close(scd.in) 122 // Ensure that bufferSignals() has exited and won't attempt 123 // any future sends on scd.ch 124 <-scd.done 125 }