map.go (9398B)
1 package mutexes 2 3 import ( 4 "runtime" 5 "sync" 6 "sync/atomic" 7 ) 8 9 const ( 10 // possible lock types. 11 lockTypeRead = uint8(1) << 0 12 lockTypeWrite = uint8(1) << 1 13 lockTypeMap = uint8(1) << 2 14 15 // possible mutexmap states. 16 stateUnlockd = uint8(0) 17 stateRLocked = uint8(1) 18 stateLocked = uint8(2) 19 stateInUse = uint8(3) 20 21 // default values. 22 defaultWake = 1024 23 ) 24 25 // acquireState attempts to acquire required map state for lockType. 26 func acquireState(state uint8, lt uint8) (uint8, bool) { 27 switch state { 28 // Unlocked state 29 // (all allowed) 30 case stateUnlockd: 31 32 // Keys locked, no state lock. 33 // (don't allow map locks) 34 case stateInUse: 35 if lt&lockTypeMap != 0 { 36 return 0, false 37 } 38 39 // Read locked 40 // (only allow read locks) 41 case stateRLocked: 42 if lt&lockTypeRead == 0 { 43 return 0, false 44 } 45 46 // Write locked 47 // (none allowed) 48 case stateLocked: 49 return 0, false 50 51 // shouldn't reach here 52 default: 53 panic("unexpected state") 54 } 55 56 switch { 57 // If unlocked and not a map 58 // lock request, set in use 59 case lt&lockTypeMap == 0: 60 if state == stateUnlockd { 61 state = stateInUse 62 } 63 64 // Set read lock state 65 case lt&lockTypeRead != 0: 66 state = stateRLocked 67 68 // Set write lock state 69 case lt&lockTypeWrite != 0: 70 state = stateLocked 71 72 default: 73 panic("unexpected lock type") 74 } 75 76 return state, true 77 } 78 79 // MutexMap is a structure that allows read / write locking key, performing 80 // as you'd expect a map[string]*sync.RWMutex to perform. The differences 81 // being that the entire map can itself be read / write locked, it uses memory 82 // pooling for the mutex (not quite) structures, and it is self-evicting. The 83 // core configurations of maximum no. open locks and wake modulus* are user 84 // definable. 85 // 86 // * The wake modulus is the number that the current number of open locks is 87 // modulused against to determine how often to notify sleeping goroutines. 88 // These are goroutines that are attempting to lock a key / whole map and are 89 // awaiting a permissible state (.e.g no key write locks allowed when the 90 // map is read locked). 91 type MutexMap struct { 92 queue *sync.WaitGroup 93 qucnt int32 94 95 mumap map[string]*rwmutex 96 mpool pool 97 evict []*rwmutex 98 99 count int32 100 maxmu int32 101 wake int32 102 103 mapmu sync.Mutex 104 state uint8 105 } 106 107 // NewMap returns a new MutexMap instance with provided max no. open mutexes. 108 func NewMap(max, wake int32) MutexMap { 109 // Determine wake mod. 110 if wake < 1 { 111 wake = defaultWake 112 } 113 114 // Determine max no. mutexes 115 if max < 1 { 116 procs := runtime.GOMAXPROCS(0) 117 max = wake * int32(procs) 118 } 119 120 return MutexMap{ 121 queue: &sync.WaitGroup{}, 122 mumap: make(map[string]*rwmutex, max), 123 maxmu: max, 124 wake: wake, 125 } 126 } 127 128 // SET sets the MutexMap max open locks and wake modulus, returns current values. 129 // For values less than zero defaults are set, and zero is non-op. 130 func (mm *MutexMap) SET(max, wake int32) (int32, int32) { 131 mm.mapmu.Lock() 132 133 switch { 134 // Set default wake 135 case wake < 0: 136 mm.wake = defaultWake 137 138 // Set supplied wake 139 case wake > 0: 140 mm.wake = wake 141 } 142 143 switch { 144 // Set default max 145 case max < 0: 146 procs := runtime.GOMAXPROCS(0) 147 mm.maxmu = wake * int32(procs) 148 149 // Set supplied max 150 case max > 0: 151 mm.maxmu = max 152 } 153 154 // Fetch values 155 max = mm.maxmu 156 wake = mm.wake 157 158 mm.mapmu.Unlock() 159 return max, wake 160 } 161 162 // spinLock will wait (using a mutex to sleep thread) until conditional returns true. 163 func (mm *MutexMap) spinLock(cond func() bool) { 164 for { 165 // Acquire map lock 166 mm.mapmu.Lock() 167 168 if cond() { 169 return 170 } 171 172 // Current queue ptr 173 queue := mm.queue 174 175 // Queue ourselves 176 queue.Add(1) 177 mm.qucnt++ 178 179 // Unlock map 180 mm.mapmu.Unlock() 181 182 // Wait on notify 183 mm.queue.Wait() 184 } 185 } 186 187 // lock will acquire a lock of given type on the 'mutex' at key. 188 func (mm *MutexMap) lock(key string, lt uint8) func() { 189 var ok bool 190 var mu *rwmutex 191 192 // Spin lock until returns true 193 mm.spinLock(func() bool { 194 // Check not overloaded 195 if !(mm.count < mm.maxmu) { 196 return false 197 } 198 199 // Attempt to acquire usable map state 200 state, ok := acquireState(mm.state, lt) 201 if !ok { 202 return false 203 } 204 205 // Update state 206 mm.state = state 207 208 // Ensure mutex at key 209 // is in lockable state 210 mu, ok = mm.mumap[key] 211 return !ok || mu.CanLock(lt) 212 }) 213 214 // Incr count 215 mm.count++ 216 217 if !ok { 218 // No mutex found for key 219 220 // Alloc mu from pool 221 mu = mm.mpool.Acquire() 222 mm.mumap[key] = mu 223 224 // Set our key 225 mu.key = key 226 227 // Queue for eviction 228 mm.evict = append(mm.evict, mu) 229 } 230 231 // Lock mutex 232 mu.Lock(lt) 233 234 // Unlock map 235 mm.mapmu.Unlock() 236 237 return func() { 238 mm.mapmu.Lock() 239 mu.Unlock() 240 mm.cleanup() 241 } 242 } 243 244 // lockMap will lock the whole map under given lock type. 245 func (mm *MutexMap) lockMap(lt uint8) { 246 // Spin lock until returns true 247 mm.spinLock(func() bool { 248 // Attempt to acquire usable map state 249 state, ok := acquireState(mm.state, lt) 250 if !ok { 251 return false 252 } 253 254 // Update state 255 mm.state = state 256 257 return true 258 }) 259 260 // Incr count 261 mm.count++ 262 263 // State acquired, unlock 264 mm.mapmu.Unlock() 265 } 266 267 // cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. 268 func (mm *MutexMap) cleanup() { 269 // Decr count 270 mm.count-- 271 272 // Calculate current wake modulus 273 wakemod := mm.count % mm.wake 274 275 if mm.count != 0 && wakemod != 0 { 276 // Fast path => no cleanup. 277 // Unlock, return early 278 mm.mapmu.Unlock() 279 return 280 } 281 282 go func() { 283 if wakemod == 0 { 284 // Release queued goroutines 285 mm.queue.Add(-int(mm.qucnt)) 286 287 // Allocate new queue and reset 288 mm.queue = &sync.WaitGroup{} 289 mm.qucnt = 0 290 } 291 292 if mm.count == 0 { 293 // Perform evictions 294 for _, mu := range mm.evict { 295 key := mu.key 296 mu.key = "" 297 delete(mm.mumap, key) 298 mm.mpool.Release(mu) 299 } 300 301 // Reset map state 302 mm.evict = mm.evict[:0] 303 mm.state = stateUnlockd 304 mm.mpool.GC() 305 } 306 307 // Unlock map 308 mm.mapmu.Unlock() 309 }() 310 } 311 312 // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. 313 // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. 314 func (mm *MutexMap) RLockMap() *LockState { 315 mm.lockMap(lockTypeRead | lockTypeMap) 316 return &LockState{ 317 mmap: mm, 318 ltyp: lockTypeRead, 319 } 320 } 321 322 // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. 323 // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. 324 func (mm *MutexMap) LockMap() *LockState { 325 mm.lockMap(lockTypeWrite | lockTypeMap) 326 return &LockState{ 327 mmap: mm, 328 ltyp: lockTypeWrite, 329 } 330 } 331 332 // RLock acquires a mutex read lock for supplied key, returning an RUnlock function. 333 func (mm *MutexMap) RLock(key string) (runlock func()) { 334 return mm.lock(key, lockTypeRead) 335 } 336 337 // Lock acquires a mutex write lock for supplied key, returning an Unlock function. 338 func (mm *MutexMap) Lock(key string) (unlock func()) { 339 return mm.lock(key, lockTypeWrite) 340 } 341 342 // LockState represents a window to a locked MutexMap. 343 type LockState struct { 344 wait sync.WaitGroup 345 mmap *MutexMap 346 done uint32 347 ltyp uint8 348 } 349 350 // Lock: see MutexMap.Lock() definition. Will panic if map only read locked. 351 func (st *LockState) Lock(key string) (unlock func()) { 352 return st.lock(key, lockTypeWrite) 353 } 354 355 // RLock: see MutexMap.RLock() definition. 356 func (st *LockState) RLock(key string) (runlock func()) { 357 return st.lock(key, lockTypeRead) 358 } 359 360 // lock: see MutexMap.lock() definition. 361 func (st *LockState) lock(key string, lt uint8) func() { 362 st.wait.Add(1) // track lock 363 364 if atomic.LoadUint32(&st.done) == 1 { 365 panic("called (r)lock on unlocked state") 366 } else if lt&lockTypeWrite != 0 && 367 st.ltyp&lockTypeWrite == 0 { 368 panic("called lock on rlocked map") 369 } 370 371 var ok bool 372 var mu *rwmutex 373 374 // Spin lock until returns true 375 st.mmap.spinLock(func() bool { 376 // Check not overloaded 377 if !(st.mmap.count < st.mmap.maxmu) { 378 return false 379 } 380 381 // Ensure mutex at key 382 // is in lockable state 383 mu, ok = st.mmap.mumap[key] 384 return !ok || mu.CanLock(lt) 385 }) 386 387 // Incr count 388 st.mmap.count++ 389 390 if !ok { 391 // No mutex found for key 392 393 // Alloc mu from pool 394 mu = st.mmap.mpool.Acquire() 395 st.mmap.mumap[key] = mu 396 397 // Set our key 398 mu.key = key 399 400 // Queue for eviction 401 st.mmap.evict = append(st.mmap.evict, mu) 402 } 403 404 // Lock mutex 405 mu.Lock(lt) 406 407 // Unlock map 408 st.mmap.mapmu.Unlock() 409 410 return func() { 411 st.mmap.mapmu.Lock() 412 mu.Unlock() 413 st.mmap.cleanup() 414 st.wait.Add(-1) 415 } 416 } 417 418 // UnlockMap will close this state and release the currently locked map. 419 func (st *LockState) UnlockMap() { 420 if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { 421 panic("called unlockmap on expired state") 422 } 423 st.wait.Wait() 424 st.mmap.mapmu.Lock() 425 st.mmap.cleanup() 426 } 427 428 // rwmutex is a very simple *representation* of a read-write 429 // mutex, though not one in implementation. it works by 430 // tracking the lock state for a given map key, which is 431 // protected by the map's mutex. 432 type rwmutex struct { 433 rcnt int32 // read lock count 434 lock uint8 // lock type 435 key string // map key 436 } 437 438 func (mu *rwmutex) CanLock(lt uint8) bool { 439 return mu.lock == 0 || 440 (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) 441 } 442 443 func (mu *rwmutex) Lock(lt uint8) { 444 // Set lock type 445 mu.lock = lt 446 447 if lt&lockTypeRead != 0 { 448 // RLock, increment 449 mu.rcnt++ 450 } 451 } 452 453 func (mu *rwmutex) Unlock() { 454 if mu.rcnt > 0 { 455 // RUnlock 456 mu.rcnt-- 457 } 458 459 if mu.rcnt == 0 { 460 // Total unlock 461 mu.lock = 0 462 } 463 }