gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

map.go (9398B)


      1 package mutexes
      2 
      3 import (
      4 	"runtime"
      5 	"sync"
      6 	"sync/atomic"
      7 )
      8 
      9 const (
     10 	// possible lock types.
     11 	lockTypeRead  = uint8(1) << 0
     12 	lockTypeWrite = uint8(1) << 1
     13 	lockTypeMap   = uint8(1) << 2
     14 
     15 	// possible mutexmap states.
     16 	stateUnlockd = uint8(0)
     17 	stateRLocked = uint8(1)
     18 	stateLocked  = uint8(2)
     19 	stateInUse   = uint8(3)
     20 
     21 	// default values.
     22 	defaultWake = 1024
     23 )
     24 
     25 // acquireState attempts to acquire required map state for lockType.
     26 func acquireState(state uint8, lt uint8) (uint8, bool) {
     27 	switch state {
     28 	// Unlocked state
     29 	// (all allowed)
     30 	case stateUnlockd:
     31 
     32 	// Keys locked, no state lock.
     33 	// (don't allow map locks)
     34 	case stateInUse:
     35 		if lt&lockTypeMap != 0 {
     36 			return 0, false
     37 		}
     38 
     39 	// Read locked
     40 	// (only allow read locks)
     41 	case stateRLocked:
     42 		if lt&lockTypeRead == 0 {
     43 			return 0, false
     44 		}
     45 
     46 	// Write locked
     47 	// (none allowed)
     48 	case stateLocked:
     49 		return 0, false
     50 
     51 	// shouldn't reach here
     52 	default:
     53 		panic("unexpected state")
     54 	}
     55 
     56 	switch {
     57 	// If unlocked and not a map
     58 	// lock request, set in use
     59 	case lt&lockTypeMap == 0:
     60 		if state == stateUnlockd {
     61 			state = stateInUse
     62 		}
     63 
     64 	// Set read lock state
     65 	case lt&lockTypeRead != 0:
     66 		state = stateRLocked
     67 
     68 	// Set write lock state
     69 	case lt&lockTypeWrite != 0:
     70 		state = stateLocked
     71 
     72 	default:
     73 		panic("unexpected lock type")
     74 	}
     75 
     76 	return state, true
     77 }
     78 
     79 // MutexMap is a structure that allows read / write locking key, performing
     80 // as you'd expect a map[string]*sync.RWMutex to perform. The differences
     81 // being that the entire map can itself be read / write locked, it uses memory
     82 // pooling for the mutex (not quite) structures, and it is self-evicting. The
     83 // core configurations of maximum no. open locks and wake modulus* are user
     84 // definable.
     85 //
     86 // * The wake modulus is the number that the current number of open locks is
     87 // modulused against to determine how often to notify sleeping goroutines.
     88 // These are goroutines that are attempting to lock a key / whole map and are
     89 // awaiting a permissible state (.e.g no key write locks allowed when the
     90 // map is read locked).
     91 type MutexMap struct {
     92 	queue *sync.WaitGroup
     93 	qucnt int32
     94 
     95 	mumap map[string]*rwmutex
     96 	mpool pool
     97 	evict []*rwmutex
     98 
     99 	count int32
    100 	maxmu int32
    101 	wake  int32
    102 
    103 	mapmu sync.Mutex
    104 	state uint8
    105 }
    106 
    107 // NewMap returns a new MutexMap instance with provided max no. open mutexes.
    108 func NewMap(max, wake int32) MutexMap {
    109 	// Determine wake mod.
    110 	if wake < 1 {
    111 		wake = defaultWake
    112 	}
    113 
    114 	// Determine max no. mutexes
    115 	if max < 1 {
    116 		procs := runtime.GOMAXPROCS(0)
    117 		max = wake * int32(procs)
    118 	}
    119 
    120 	return MutexMap{
    121 		queue: &sync.WaitGroup{},
    122 		mumap: make(map[string]*rwmutex, max),
    123 		maxmu: max,
    124 		wake:  wake,
    125 	}
    126 }
    127 
    128 // SET sets the MutexMap max open locks and wake modulus, returns current values.
    129 // For values less than zero defaults are set, and zero is non-op.
    130 func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
    131 	mm.mapmu.Lock()
    132 
    133 	switch {
    134 	// Set default wake
    135 	case wake < 0:
    136 		mm.wake = defaultWake
    137 
    138 	// Set supplied wake
    139 	case wake > 0:
    140 		mm.wake = wake
    141 	}
    142 
    143 	switch {
    144 	// Set default max
    145 	case max < 0:
    146 		procs := runtime.GOMAXPROCS(0)
    147 		mm.maxmu = wake * int32(procs)
    148 
    149 	// Set supplied max
    150 	case max > 0:
    151 		mm.maxmu = max
    152 	}
    153 
    154 	// Fetch values
    155 	max = mm.maxmu
    156 	wake = mm.wake
    157 
    158 	mm.mapmu.Unlock()
    159 	return max, wake
    160 }
    161 
    162 // spinLock will wait (using a mutex to sleep thread) until conditional returns true.
    163 func (mm *MutexMap) spinLock(cond func() bool) {
    164 	for {
    165 		// Acquire map lock
    166 		mm.mapmu.Lock()
    167 
    168 		if cond() {
    169 			return
    170 		}
    171 
    172 		// Current queue ptr
    173 		queue := mm.queue
    174 
    175 		// Queue ourselves
    176 		queue.Add(1)
    177 		mm.qucnt++
    178 
    179 		// Unlock map
    180 		mm.mapmu.Unlock()
    181 
    182 		// Wait on notify
    183 		mm.queue.Wait()
    184 	}
    185 }
    186 
    187 // lock will acquire a lock of given type on the 'mutex' at key.
    188 func (mm *MutexMap) lock(key string, lt uint8) func() {
    189 	var ok bool
    190 	var mu *rwmutex
    191 
    192 	// Spin lock until returns true
    193 	mm.spinLock(func() bool {
    194 		// Check not overloaded
    195 		if !(mm.count < mm.maxmu) {
    196 			return false
    197 		}
    198 
    199 		// Attempt to acquire usable map state
    200 		state, ok := acquireState(mm.state, lt)
    201 		if !ok {
    202 			return false
    203 		}
    204 
    205 		// Update state
    206 		mm.state = state
    207 
    208 		// Ensure mutex at key
    209 		// is in lockable state
    210 		mu, ok = mm.mumap[key]
    211 		return !ok || mu.CanLock(lt)
    212 	})
    213 
    214 	// Incr count
    215 	mm.count++
    216 
    217 	if !ok {
    218 		// No mutex found for key
    219 
    220 		// Alloc mu from pool
    221 		mu = mm.mpool.Acquire()
    222 		mm.mumap[key] = mu
    223 
    224 		// Set our key
    225 		mu.key = key
    226 
    227 		// Queue for eviction
    228 		mm.evict = append(mm.evict, mu)
    229 	}
    230 
    231 	// Lock mutex
    232 	mu.Lock(lt)
    233 
    234 	// Unlock map
    235 	mm.mapmu.Unlock()
    236 
    237 	return func() {
    238 		mm.mapmu.Lock()
    239 		mu.Unlock()
    240 		mm.cleanup()
    241 	}
    242 }
    243 
    244 // lockMap will lock the whole map under given lock type.
    245 func (mm *MutexMap) lockMap(lt uint8) {
    246 	// Spin lock until returns true
    247 	mm.spinLock(func() bool {
    248 		// Attempt to acquire usable map state
    249 		state, ok := acquireState(mm.state, lt)
    250 		if !ok {
    251 			return false
    252 		}
    253 
    254 		// Update state
    255 		mm.state = state
    256 
    257 		return true
    258 	})
    259 
    260 	// Incr count
    261 	mm.count++
    262 
    263 	// State acquired, unlock
    264 	mm.mapmu.Unlock()
    265 }
    266 
    267 // cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map.
    268 func (mm *MutexMap) cleanup() {
    269 	// Decr count
    270 	mm.count--
    271 
    272 	// Calculate current wake modulus
    273 	wakemod := mm.count % mm.wake
    274 
    275 	if mm.count != 0 && wakemod != 0 {
    276 		// Fast path => no cleanup.
    277 		// Unlock, return early
    278 		mm.mapmu.Unlock()
    279 		return
    280 	}
    281 
    282 	go func() {
    283 		if wakemod == 0 {
    284 			// Release queued goroutines
    285 			mm.queue.Add(-int(mm.qucnt))
    286 
    287 			// Allocate new queue and reset
    288 			mm.queue = &sync.WaitGroup{}
    289 			mm.qucnt = 0
    290 		}
    291 
    292 		if mm.count == 0 {
    293 			// Perform evictions
    294 			for _, mu := range mm.evict {
    295 				key := mu.key
    296 				mu.key = ""
    297 				delete(mm.mumap, key)
    298 				mm.mpool.Release(mu)
    299 			}
    300 
    301 			// Reset map state
    302 			mm.evict = mm.evict[:0]
    303 			mm.state = stateUnlockd
    304 			mm.mpool.GC()
    305 		}
    306 
    307 		// Unlock map
    308 		mm.mapmu.Unlock()
    309 	}()
    310 }
    311 
    312 // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
    313 // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
    314 func (mm *MutexMap) RLockMap() *LockState {
    315 	mm.lockMap(lockTypeRead | lockTypeMap)
    316 	return &LockState{
    317 		mmap: mm,
    318 		ltyp: lockTypeRead,
    319 	}
    320 }
    321 
    322 // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
    323 // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
    324 func (mm *MutexMap) LockMap() *LockState {
    325 	mm.lockMap(lockTypeWrite | lockTypeMap)
    326 	return &LockState{
    327 		mmap: mm,
    328 		ltyp: lockTypeWrite,
    329 	}
    330 }
    331 
    332 // RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
    333 func (mm *MutexMap) RLock(key string) (runlock func()) {
    334 	return mm.lock(key, lockTypeRead)
    335 }
    336 
    337 // Lock acquires a mutex write lock for supplied key, returning an Unlock function.
    338 func (mm *MutexMap) Lock(key string) (unlock func()) {
    339 	return mm.lock(key, lockTypeWrite)
    340 }
    341 
    342 // LockState represents a window to a locked MutexMap.
    343 type LockState struct {
    344 	wait sync.WaitGroup
    345 	mmap *MutexMap
    346 	done uint32
    347 	ltyp uint8
    348 }
    349 
    350 // Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
    351 func (st *LockState) Lock(key string) (unlock func()) {
    352 	return st.lock(key, lockTypeWrite)
    353 }
    354 
    355 // RLock: see MutexMap.RLock() definition.
    356 func (st *LockState) RLock(key string) (runlock func()) {
    357 	return st.lock(key, lockTypeRead)
    358 }
    359 
    360 // lock: see MutexMap.lock() definition.
    361 func (st *LockState) lock(key string, lt uint8) func() {
    362 	st.wait.Add(1) // track lock
    363 
    364 	if atomic.LoadUint32(&st.done) == 1 {
    365 		panic("called (r)lock on unlocked state")
    366 	} else if lt&lockTypeWrite != 0 &&
    367 		st.ltyp&lockTypeWrite == 0 {
    368 		panic("called lock on rlocked map")
    369 	}
    370 
    371 	var ok bool
    372 	var mu *rwmutex
    373 
    374 	// Spin lock until returns true
    375 	st.mmap.spinLock(func() bool {
    376 		// Check not overloaded
    377 		if !(st.mmap.count < st.mmap.maxmu) {
    378 			return false
    379 		}
    380 
    381 		// Ensure mutex at key
    382 		// is in lockable state
    383 		mu, ok = st.mmap.mumap[key]
    384 		return !ok || mu.CanLock(lt)
    385 	})
    386 
    387 	// Incr count
    388 	st.mmap.count++
    389 
    390 	if !ok {
    391 		// No mutex found for key
    392 
    393 		// Alloc mu from pool
    394 		mu = st.mmap.mpool.Acquire()
    395 		st.mmap.mumap[key] = mu
    396 
    397 		// Set our key
    398 		mu.key = key
    399 
    400 		// Queue for eviction
    401 		st.mmap.evict = append(st.mmap.evict, mu)
    402 	}
    403 
    404 	// Lock mutex
    405 	mu.Lock(lt)
    406 
    407 	// Unlock map
    408 	st.mmap.mapmu.Unlock()
    409 
    410 	return func() {
    411 		st.mmap.mapmu.Lock()
    412 		mu.Unlock()
    413 		st.mmap.cleanup()
    414 		st.wait.Add(-1)
    415 	}
    416 }
    417 
    418 // UnlockMap will close this state and release the currently locked map.
    419 func (st *LockState) UnlockMap() {
    420 	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
    421 		panic("called unlockmap on expired state")
    422 	}
    423 	st.wait.Wait()
    424 	st.mmap.mapmu.Lock()
    425 	st.mmap.cleanup()
    426 }
    427 
    428 // rwmutex is a very simple *representation* of a read-write
    429 // mutex, though not one in implementation. it works by
    430 // tracking the lock state for a given map key, which is
    431 // protected by the map's mutex.
    432 type rwmutex struct {
    433 	rcnt int32  // read lock count
    434 	lock uint8  // lock type
    435 	key  string // map key
    436 }
    437 
    438 func (mu *rwmutex) CanLock(lt uint8) bool {
    439 	return mu.lock == 0 ||
    440 		(mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0)
    441 }
    442 
    443 func (mu *rwmutex) Lock(lt uint8) {
    444 	// Set lock type
    445 	mu.lock = lt
    446 
    447 	if lt&lockTypeRead != 0 {
    448 		// RLock, increment
    449 		mu.rcnt++
    450 	}
    451 }
    452 
    453 func (mu *rwmutex) Unlock() {
    454 	if mu.rcnt > 0 {
    455 		// RUnlock
    456 		mu.rcnt--
    457 	}
    458 
    459 	if mu.rcnt == 0 {
    460 		// Total unlock
    461 		mu.lock = 0
    462 	}
    463 }