mutex_timeout.go (3211B)
1 package mutexes 2 3 import ( 4 "sync" 5 "time" 6 ) 7 8 // TimeoutMutex defines a Mutex with timeouts on locks 9 type TimeoutMutex interface { 10 Mutex 11 12 // LockFunc is functionally the same as Lock(), but allows setting a custom hook called on timeout 13 LockFunc(func()) func() 14 } 15 16 // TimeoutRWMutex defines a RWMutex with timeouts on locks 17 type TimeoutRWMutex interface { 18 RWMutex 19 20 // LockFunc is functionally the same as Lock(), but allows setting a custom hook called on timeout 21 LockFunc(func()) func() 22 23 // RLockFunc is functionally the same as RLock(), but allows setting a custom hook called on timeout 24 RLockFunc(func()) func() 25 } 26 27 // WithTimeout wraps the supplied Mutex to add a timeout 28 func WithTimeout(mu Mutex, d time.Duration) TimeoutMutex { 29 return &timeoutMutex{mu: mu, d: d} 30 } 31 32 // WithTimeoutRW wraps the supplied RWMutex to add read/write timeouts 33 func WithTimeoutRW(mu RWMutex, rd, wd time.Duration) TimeoutRWMutex { 34 return &timeoutRWMutex{mu: mu, rd: rd, wd: wd} 35 } 36 37 // timeoutMutex wraps a Mutex with timeout 38 type timeoutMutex struct { 39 mu Mutex // mu is the wrapped mutex 40 d time.Duration // d is the timeout duration 41 } 42 43 func (mu *timeoutMutex) Lock() func() { 44 return mu.LockFunc(func() { panic("lock timed out") }) 45 } 46 47 func (mu *timeoutMutex) LockFunc(fn func()) func() { 48 return mutexTimeout(mu.d, mu.mu.Lock(), fn) 49 } 50 51 // TimeoutRWMutex wraps a RWMutex with timeouts 52 type timeoutRWMutex struct { 53 mu RWMutex // mu is the wrapped rwmutex 54 rd time.Duration // rd is the rlock timeout duration 55 wd time.Duration // wd is the lock timeout duration 56 } 57 58 func (mu *timeoutRWMutex) Lock() func() { 59 return mu.LockFunc(func() { panic("lock timed out") }) 60 } 61 62 func (mu *timeoutRWMutex) LockFunc(fn func()) func() { 63 return mutexTimeout(mu.wd, mu.mu.Lock(), fn) 64 } 65 66 func (mu *timeoutRWMutex) RLock() func() { 67 return mu.RLockFunc(func() { panic("rlock timed out") }) 68 } 69 70 func (mu *timeoutRWMutex) RLockFunc(fn func()) func() { 71 return mutexTimeout(mu.rd, mu.mu.RLock(), fn) 72 } 73 74 // mutexTimeout performs a timed unlock, calling supplied fn if timeout is reached 75 func mutexTimeout(d time.Duration, unlock func(), fn func()) func() { 76 if d < 1 { 77 // No timeout, just unlock 78 return unlock 79 } 80 81 // Acquire timer from pool 82 t := timerPool.Get().(*timer) 83 84 // Start the timer 85 go t.Start(d, fn) 86 87 // Return func cancelling timeout, 88 // replacing Timeout in pool and 89 // finally unlocking mutex 90 return func() { 91 defer timerPool.Put(t) 92 t.Cancel() 93 unlock() 94 } 95 } 96 97 // timerPool is the global &timer{} pool. 98 var timerPool = sync.Pool{ 99 New: func() interface{} { 100 t := time.NewTimer(time.Minute) 101 t.Stop() 102 return &timer{t: t, c: make(chan struct{})} 103 }, 104 } 105 106 // timer represents a reusable cancellable timer. 107 type timer struct { 108 t *time.Timer 109 c chan struct{} 110 } 111 112 // Start will start the timer with duration 'd', performing 'fn' on timeout. 113 func (t *timer) Start(d time.Duration, fn func()) { 114 t.t.Reset(d) 115 select { 116 // Timed out 117 case <-t.t.C: 118 fn() 119 120 // Cancelled 121 case <-t.c: 122 } 123 } 124 125 // Cancel will attempt to cancel the running timer. 126 func (t *timer) Cancel() { 127 select { 128 // cancel successful 129 case t.c <- struct{}{}: 130 if !t.t.Stop() { 131 <-t.t.C 132 } // stop timer 133 134 // already stopped 135 default: 136 } 137 }