gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

sequential_handler.go (2667B)


      1 package dbus
      2 
      3 import (
      4 	"sync"
      5 )
      6 
      7 // NewSequentialSignalHandler returns an instance of a new
      8 // signal handler that guarantees sequential processing of signals. It is a
      9 // guarantee of this signal handler that signals will be written to
     10 // channels in the order they are received on the DBus connection.
     11 func NewSequentialSignalHandler() SignalHandler {
     12 	return &sequentialSignalHandler{}
     13 }
     14 
     15 type sequentialSignalHandler struct {
     16 	mu      sync.RWMutex
     17 	closed  bool
     18 	signals []*sequentialSignalChannelData
     19 }
     20 
     21 func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
     22 	sh.mu.RLock()
     23 	defer sh.mu.RUnlock()
     24 	if sh.closed {
     25 		return
     26 	}
     27 	for _, scd := range sh.signals {
     28 		scd.deliver(signal)
     29 	}
     30 }
     31 
     32 func (sh *sequentialSignalHandler) Terminate() {
     33 	sh.mu.Lock()
     34 	defer sh.mu.Unlock()
     35 	if sh.closed {
     36 		return
     37 	}
     38 
     39 	for _, scd := range sh.signals {
     40 		scd.close()
     41 		close(scd.ch)
     42 	}
     43 	sh.closed = true
     44 	sh.signals = nil
     45 }
     46 
     47 func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
     48 	sh.mu.Lock()
     49 	defer sh.mu.Unlock()
     50 	if sh.closed {
     51 		return
     52 	}
     53 	sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
     54 }
     55 
     56 func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
     57 	sh.mu.Lock()
     58 	defer sh.mu.Unlock()
     59 	if sh.closed {
     60 		return
     61 	}
     62 	for i := len(sh.signals) - 1; i >= 0; i-- {
     63 		if ch == sh.signals[i].ch {
     64 			sh.signals[i].close()
     65 			copy(sh.signals[i:], sh.signals[i+1:])
     66 			sh.signals[len(sh.signals)-1] = nil
     67 			sh.signals = sh.signals[:len(sh.signals)-1]
     68 		}
     69 	}
     70 }
     71 
     72 type sequentialSignalChannelData struct {
     73 	ch   chan<- *Signal
     74 	in   chan *Signal
     75 	done chan struct{}
     76 }
     77 
     78 func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
     79 	scd := &sequentialSignalChannelData{
     80 		ch:   ch,
     81 		in:   make(chan *Signal),
     82 		done: make(chan struct{}),
     83 	}
     84 	go scd.bufferSignals()
     85 	return scd
     86 }
     87 
     88 func (scd *sequentialSignalChannelData) bufferSignals() {
     89 	defer close(scd.done)
     90 
     91 	// Ensure that signals are delivered to scd.ch in the same
     92 	// order they are received from scd.in.
     93 	var queue []*Signal
     94 	for {
     95 		if len(queue) == 0 {
     96 			signal, ok := <- scd.in
     97 			if !ok {
     98 				return
     99 			}
    100 			queue = append(queue, signal)
    101 		}
    102 		select {
    103 		case scd.ch <- queue[0]:
    104 			copy(queue, queue[1:])
    105 			queue[len(queue)-1] = nil
    106 			queue = queue[:len(queue)-1]
    107 		case signal, ok := <-scd.in:
    108 			if !ok {
    109 				return
    110 			}
    111 			queue = append(queue, signal)
    112 		}
    113 	}
    114 }
    115 
    116 func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
    117 	scd.in <- signal
    118 }
    119 
    120 func (scd *sequentialSignalChannelData) close() {
    121 	close(scd.in)
    122 	// Ensure that bufferSignals() has exited and won't attempt
    123 	// any future sends on scd.ch
    124 	<-scd.done
    125 }