commit b8879ac68a30e8bccd1c96cc4630da791d8996c4
parent 8de928b5e9441d3968e5f79860f4de08d52f24ed
Author: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>
Date: Tue, 8 Mar 2022 11:56:53 +0000
[dependencies] update go-store, go-mutexes (#422)
* update go-store, go-mutexes
Signed-off-by: kim <grufwub@gmail.com>
* update vendored code
Signed-off-by: kim <grufwub@gmail.com>
Diffstat:
13 files changed, 596 insertions(+), 224 deletions(-)
diff --git a/go.mod b/go.mod
@@ -5,7 +5,7 @@ go 1.17
require (
codeberg.org/gruf/go-errors v1.0.5
codeberg.org/gruf/go-runners v1.2.0
- codeberg.org/gruf/go-store v1.3.3
+ codeberg.org/gruf/go-store v1.3.6
github.com/ReneKroon/ttlcache v1.7.0
github.com/buckket/go-blurhash v1.1.0
github.com/coreos/go-oidc/v3 v3.1.0
@@ -49,10 +49,11 @@ require (
require (
codeberg.org/gruf/go-bytes v1.0.2 // indirect
+ codeberg.org/gruf/go-fastcopy v1.1.1 // indirect
codeberg.org/gruf/go-fastpath v1.0.2 // indirect
codeberg.org/gruf/go-format v1.0.3 // indirect
codeberg.org/gruf/go-hashenc v1.0.1 // indirect
- codeberg.org/gruf/go-mutexes v1.1.0 // indirect
+ codeberg.org/gruf/go-mutexes v1.1.2 // indirect
codeberg.org/gruf/go-pools v1.0.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
diff --git a/go.sum b/go.sum
@@ -53,6 +53,8 @@ codeberg.org/gruf/go-bytes v1.0.2/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9
codeberg.org/gruf/go-cache v1.1.2/go.mod h1:/Dbc+xU72Op3hMn6x2PXF3NE9uIDFeS+sXPF00hN/7o=
codeberg.org/gruf/go-errors v1.0.5 h1:rxV70oQkfasUdggLHxOX2QAoJOMFM7XWxHQR45Zx/Fg=
codeberg.org/gruf/go-errors v1.0.5/go.mod h1:n03EpmvcmfzU3/xJKC0XXtleXXJUNFpT2fgISODvZ1Y=
+codeberg.org/gruf/go-fastcopy v1.1.1 h1:HhPCeFdVR5pwiSVDnQEGJ+J2ny9b5QgfiESc0zrWQAY=
+codeberg.org/gruf/go-fastcopy v1.1.1/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s=
codeberg.org/gruf/go-fastpath v1.0.1/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI=
codeberg.org/gruf/go-fastpath v1.0.2 h1:O3nuYPMXnN89dsgAwVFU5iCGINtPJdITWmbRe2an/iQ=
codeberg.org/gruf/go-fastpath v1.0.2/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI=
@@ -60,8 +62,8 @@ codeberg.org/gruf/go-format v1.0.3 h1:WoUGzTwZe6SIhILNvtr0qNIA7BOOCgdBlk5bUrfeii
codeberg.org/gruf/go-format v1.0.3/go.mod h1:k3TLXp1dqAXdDqxlon0yEM+3FFHdNn0D6BVJTwTy5As=
codeberg.org/gruf/go-hashenc v1.0.1 h1:EBvNe2wW8IPMUqT1XihB6/IM6KMJDLMFBxIUvmsy1f8=
codeberg.org/gruf/go-hashenc v1.0.1/go.mod h1:IfHhPCVScOiYmJLqdCQT9bYVS1nxNTV4ewMUvFWDPtc=
-codeberg.org/gruf/go-mutexes v1.1.0 h1:kMVWHLxdfGEZTetNVRncdBMeqS4M8dSJxSGbRYXyvKk=
-codeberg.org/gruf/go-mutexes v1.1.0/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8=
+codeberg.org/gruf/go-mutexes v1.1.2 h1:AMC1CFV6kMi+iBjR3yQv8yIagG3lWm68U6sQHYFHEf4=
+codeberg.org/gruf/go-mutexes v1.1.2/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8=
codeberg.org/gruf/go-nowish v1.0.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s=
codeberg.org/gruf/go-nowish v1.1.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s=
codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos=
@@ -69,8 +71,8 @@ codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4
codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw=
codeberg.org/gruf/go-runners v1.2.0 h1:tkoPrwYMkVg1o/C4PGTR1YbC11XX4r06uLPOYajBsH4=
codeberg.org/gruf/go-runners v1.2.0/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw=
-codeberg.org/gruf/go-store v1.3.3 h1:fAP9FXy6HiLPxdD7cmpSzyfKXmVvZLjqn0m7HhxVT5M=
-codeberg.org/gruf/go-store v1.3.3/go.mod h1:g4+9h3wbwZ6IW0uhpw57xywcqiy4CIj0zQLqqtjEU1M=
+codeberg.org/gruf/go-store v1.3.6 h1:OKzdvfUC+nvsWV5FiSKdk+85yvxF2Tb7K5ZtRqlDBDU=
+codeberg.org/gruf/go-store v1.3.6/go.mod h1:a4vJtZf61UyrsejskX8q+s0lZeNGy7cJLUZt+fH00wo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
diff --git a/vendor/codeberg.org/gruf/go-fastcopy/README.md b/vendor/codeberg.org/gruf/go-fastcopy/README.md
@@ -0,0 +1,3 @@
+# go-fastcopy
+
+An `io.Copy()` implementation that uses a memory pool for the copy buffer.
+\ No newline at end of file
diff --git a/vendor/codeberg.org/gruf/go-fastcopy/copy.go b/vendor/codeberg.org/gruf/go-fastcopy/copy.go
@@ -0,0 +1,134 @@
+package fastcopy
+
+import (
+ "io"
+ "sync"
+ _ "unsafe" // link to io.errInvalidWrite.
+)
+
+var (
+ // global pool instance.
+ pool = CopyPool{size: 4096}
+
+ //go:linkname errInvalidWrite io.errInvalidWrite
+ errInvalidWrite error
+)
+
+// CopyPool provides a memory pool of byte
+// buffers for io copies from readers to writers.
+type CopyPool struct {
+ size int
+ pool sync.Pool
+}
+
+// See CopyPool.Buffer().
+func Buffer(sz int) int {
+ return pool.Buffer(sz)
+}
+
+// See CopyPool.CopyN().
+func CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) {
+ return pool.CopyN(dst, src, n)
+}
+
+// See CopyPool.Copy().
+func Copy(dst io.Writer, src io.Reader) (int64, error) {
+ return pool.Copy(dst, src)
+}
+
+// Buffer sets the pool buffer size to allocate. Returns current size.
+// Note this is NOT atomically safe, please call BEFORE other calls to CopyPool.
+func (cp *CopyPool) Buffer(sz int) int {
+ if sz > 0 {
+ // update size
+ cp.size = sz
+ } else if cp.size < 1 {
+ // default size
+ return 4096
+ }
+ return cp.size
+}
+
+// CopyN performs the same logic as io.CopyN(), with the difference
+// being that the byte buffer is acquired from a memory pool.
+func (cp *CopyPool) CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) {
+ written, err := cp.Copy(dst, io.LimitReader(src, n))
+ if written == n {
+ return n, nil
+ }
+ if written < n && err == nil {
+ // src stopped early; must have been EOF.
+ err = io.EOF
+ }
+ return written, err
+}
+
+// Copy performs the same logic as io.Copy(), with the difference
+// being that the byte buffer is acquired from a memory pool.
+func (cp *CopyPool) Copy(dst io.Writer, src io.Reader) (int64, error) {
+ // Prefer using io.WriterTo to do the copy (avoids alloc + copy)
+ if wt, ok := src.(io.WriterTo); ok {
+ return wt.WriteTo(dst)
+ }
+
+ // Prefer using io.ReaderFrom to do the copy.
+ if rt, ok := dst.(io.ReaderFrom); ok {
+ return rt.ReadFrom(src)
+ }
+
+ var buf []byte
+
+ if b, ok := cp.pool.Get().([]byte); ok {
+ // Acquired buf from pool
+ buf = b
+ } else {
+ // Allocate new buffer of size
+ buf = make([]byte, cp.Buffer(0))
+ }
+
+ // Defer release to pool
+ defer cp.pool.Put(buf)
+
+ var n int64
+ for {
+ // Perform next read into buf
+ nr, err := src.Read(buf)
+ if nr > 0 {
+ // We error check AFTER checking
+ // no. read bytes so incomplete
+ // read still gets written up to nr.
+
+ // Perform next write from buf
+ nw, ew := dst.Write(buf[0:nr])
+
+ // Check for valid write
+ if nw < 0 || nr < nw {
+ if ew == nil {
+ ew = errInvalidWrite
+ }
+ return n, ew
+ }
+
+ // Incr total count
+ n += int64(nw)
+
+ // Check write error
+ if ew != nil {
+ return n, ew
+ }
+
+ // Check unequal read/writes
+ if nr != nw {
+ return n, io.ErrShortWrite
+ }
+ }
+
+ // Return on err
+ if err != nil {
+ if err == io.EOF {
+ err = nil // expected
+ }
+ return n, err
+ }
+ }
+}
diff --git a/vendor/codeberg.org/gruf/go-mutexes/debug.go b/vendor/codeberg.org/gruf/go-mutexes/debug.go
@@ -0,0 +1,39 @@
+package mutexes
+
+// func init() {
+// log.SetFlags(log.Flags() | log.Lshortfile)
+// }
+
+// type debugMutex sync.Mutex
+
+// func (mu *debugMutex) Lock() {
+// log.Output(2, "Lock()")
+// (*sync.Mutex)(mu).Lock()
+// }
+
+// func (mu *debugMutex) Unlock() {
+// log.Output(2, "Unlock()")
+// (*sync.Mutex)(mu).Unlock()
+// }
+
+// type debugRWMutex sync.RWMutex
+
+// func (mu *debugRWMutex) Lock() {
+// log.Output(2, "Lock()")
+// (*sync.RWMutex)(mu).Lock()
+// }
+
+// func (mu *debugRWMutex) Unlock() {
+// log.Output(2, "Unlock()")
+// (*sync.RWMutex)(mu).Unlock()
+// }
+
+// func (mu *debugRWMutex) RLock() {
+// log.Output(2, "RLock()")
+// (*sync.RWMutex)(mu).RLock()
+// }
+
+// func (mu *debugRWMutex) RUnlock() {
+// log.Output(2, "RUnlock()")
+// (*sync.RWMutex)(mu).RUnlock()
+// }
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -6,260 +6,347 @@ import (
"sync/atomic"
)
-// locktype defines maskable mutexmap lock types.
-type locktype uint8
-
const (
// possible lock types.
- lockTypeRead = locktype(1) << 0
- lockTypeWrite = locktype(1) << 1
- lockTypeMap = locktype(1) << 2
+ lockTypeRead = uint8(1) << 0
+ lockTypeWrite = uint8(1) << 1
+ lockTypeMap = uint8(1) << 2
// possible mutexmap states.
stateUnlockd = uint8(0)
stateRLocked = uint8(1)
stateLocked = uint8(2)
stateInUse = uint8(3)
+
+ // default values.
+ defaultWake = 1024
)
-// permitLockType returns if provided locktype is permitted to go ahead in current state.
-func permitLockType(state uint8, lt locktype) bool {
+// acquireState attempts to acquire required map state for lockType.
+func acquireState(state uint8, lt uint8) (uint8, bool) {
switch state {
// Unlocked state
// (all allowed)
case stateUnlockd:
- return true
// Keys locked, no state lock.
// (don't allow map locks)
case stateInUse:
- return lt&lockTypeMap == 0
+ if lt&lockTypeMap != 0 {
+ return 0, false
+ }
// Read locked
// (only allow read locks)
case stateRLocked:
- return lt&lockTypeRead != 0
+ if lt&lockTypeRead == 0 {
+ return 0, false
+ }
// Write locked
// (none allowed)
case stateLocked:
- return false
+ return 0, false
// shouldn't reach here
default:
panic("unexpected state")
}
+
+ switch {
+ // If unlocked and not a map
+ // lock request, set in use
+ case lt&lockTypeMap == 0:
+ if state == stateUnlockd {
+ state = stateInUse
+ }
+
+ // Set read lock state
+ case lt&lockTypeRead != 0:
+ state = stateRLocked
+
+ // Set write lock state
+ case lt&lockTypeWrite != 0:
+ state = stateLocked
+
+ default:
+ panic("unexpected lock type")
+ }
+
+ return state, true
}
-// MutexMap is a structure that allows having a map of self-evicting mutexes
-// by key. You do not need to worry about managing the contents of the map,
-// only requesting RLock/Lock for keys, and ensuring to call the returned
-// unlock functions.
+// MutexMap is a structure that allows read / write locking key, performing
+// as you'd expect a map[string]*sync.RWMutex to perform. The differences
+// being that the entire map can itself be read / write locked, it uses memory
+// pooling for the mutex (not quite) structures, and it is self-evicting. The
+// core configurations of maximum no. open locks and wake modulus* are user
+// definable.
+//
+// * The wake modulus is the number that the current number of open locks is
+// modulused against to determine how often to notify sleeping goroutines.
+// These are goroutines that are attempting to lock a key / whole map and are
+// awaiting a permissible state (.e.g no key write locks allowed when the
+// map is read locked).
type MutexMap struct {
- mus map[string]RWMutex
- mapMu sync.Mutex
- pool sync.Pool
- queue []func()
- evict []func()
+ qpool pool
+ queue []*sync.Mutex
+
+ mumap map[string]*rwmutex
+ mpool pool
+ evict []*rwmutex
+
count int32
maxmu int32
+ wake int32
+
+ mapmu sync.Mutex
state uint8
}
// NewMap returns a new MutexMap instance with provided max no. open mutexes.
-func NewMap(max int32) MutexMap {
+func NewMap(max, wake int32) MutexMap {
+ // Determine wake mod.
+ if wake < 1 {
+ wake = defaultWake
+ }
+
+ // Determine max no. mutexes
if max < 1 {
- // Default = 128 * GOMAXPROCS
procs := runtime.GOMAXPROCS(0)
- max = int32(procs * 128)
+ max = wake * int32(procs)
}
+
return MutexMap{
- mus: make(map[string]RWMutex),
- pool: sync.Pool{
- New: func() interface{} {
- return NewRW()
+ qpool: pool{
+ alloc: func() interface{} {
+ return &sync.Mutex{}
+ },
+ },
+ mumap: make(map[string]*rwmutex, max),
+ mpool: pool{
+ alloc: func() interface{} {
+ return &rwmutex{}
},
},
maxmu: max,
+ wake: wake,
}
}
-// acquire will either acquire a mutex from pool or alloc.
-func (mm *MutexMap) acquire() RWMutex {
- return mm.pool.Get().(RWMutex)
-}
+// MAX sets the MutexMap max open locks and wake modulus, returns current values.
+// For values less than zero defaults are set, and zero is non-op.
+func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
+ mm.mapmu.Lock()
+
+ switch {
+ // Set default wake
+ case wake < 0:
+ mm.wake = defaultWake
+
+ // Set supplied wake
+ case wake > 0:
+ mm.wake = wake
+ }
+
+ switch {
+ // Set default max
+ case max < 0:
+ procs := runtime.GOMAXPROCS(0)
+ mm.maxmu = wake * int32(procs)
+
+ // Set supplied max
+ case max > 0:
+ mm.maxmu = max
+ }
-// release will release provided mutex to pool.
-func (mm *MutexMap) release(mu RWMutex) {
- mm.pool.Put(mu)
+ // Fetch values
+ max = mm.maxmu
+ wake = mm.wake
+
+ mm.mapmu.Unlock()
+ return max, wake
}
-// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true,
-// returning with map lock. Note that 'cond' is performed within a map lock.
+// spinLock will wait (using a mutex to sleep thread) until conditional returns true.
func (mm *MutexMap) spinLock(cond func() bool) {
- mu := mm.acquire()
- defer mm.release(mu)
+ var mu *sync.Mutex
for {
- // Get map lock
- mm.mapMu.Lock()
+ // Acquire map lock
+ mm.mapmu.Lock()
- // Check if return
if cond() {
+ // Release mu if needed
+ if mu != nil {
+ mm.qpool.Release(mu)
+ }
return
}
+ // Alloc mu if needed
+ if mu == nil {
+ v := mm.qpool.Acquire()
+ mu = v.(*sync.Mutex)
+ }
+
// Queue ourselves
- unlock := mu.Lock()
- mm.queue = append(mm.queue, unlock)
- mm.mapMu.Unlock()
+ mm.queue = append(mm.queue, mu)
+ mu.Lock()
+
+ // Unlock map
+ mm.mapmu.Unlock()
// Wait on notify
- mu.Lock()()
+ mu.Lock()
+ mu.Unlock()
}
}
-// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return.
-func (mm *MutexMap) lockMutex(key string, lt locktype) func() {
- var unlock func()
+// lock will acquire a lock of given type on the 'mutex' at key.
+func (mm *MutexMap) lock(key string, lt uint8) func() {
+ var ok bool
+ var mu *rwmutex
+
+ // Spin lock until returns true
+ mm.spinLock(func() bool {
+ // Check not overloaded
+ if !(mm.count < mm.maxmu) {
+ return false
+ }
+
+ // Attempt to acquire usable map state
+ state, ok := acquireState(mm.state, lt)
+ if !ok {
+ return false
+ }
+
+ // Update state
+ mm.state = state
+
+ // Ensure mutex at key
+ // is in lockable state
+ mu, ok = mm.mumap[key]
+ return !ok || mu.CanLock(lt)
+ })
- // Incr counter
+ // Incr count
mm.count++
- // Check for existing mutex at key
- mu, ok := mm.mus[key]
if !ok {
+ // No mutex found for key
+
// Alloc from pool
- mu = mm.acquire()
- mm.mus[key] = mu
-
- // Queue mutex for eviction
- mm.evict = append(mm.evict, func() {
- delete(mm.mus, key)
- mm.pool.Put(mu)
- })
- }
+ v := mm.mpool.Acquire()
+ mu = v.(*rwmutex)
+ mm.mumap[key] = mu
- // If no state, set in use.
- // State will already have been
- // set if this is from LockState{}
- if mm.state == stateUnlockd {
- mm.state = stateInUse
- }
+ // Set our key
+ mu.key = key
- switch {
- // Read lock
- case lt&lockTypeRead != 0:
- unlock = mu.RLock()
+ // Queue for eviction
+ mm.evict = append(mm.evict, mu)
+ }
- // Write lock
- case lt&lockTypeWrite != 0:
- unlock = mu.Lock()
+ // Lock mutex
+ mu.Lock(lt)
- // shouldn't reach here
- default:
- panic("unexpected lock type")
- }
+ // Unlock map
+ mm.mapmu.Unlock()
- // Unlock map + return
- mm.mapMu.Unlock()
return func() {
- mm.mapMu.Lock()
- unlock()
- go mm.onUnlock()
+ mm.mapmu.Lock()
+ mu.Unlock()
+ go mm.cleanup()
}
}
-// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex.
-func (mm *MutexMap) onUnlock() {
- // Decr counter
+// lockMap will lock the whole map under given lock type.
+func (mm *MutexMap) lockMap(lt uint8) {
+ // Spin lock until returns true
+ mm.spinLock(func() bool {
+ // Attempt to acquire usable map state
+ state, ok := acquireState(mm.state, lt)
+ if !ok {
+ return false
+ }
+
+ // Update state
+ mm.state = state
+
+ return true
+ })
+
+ // Incr count
+ mm.count++
+
+ // State acquired, unlock
+ mm.mapmu.Unlock()
+}
+
+// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map.
+func (mm *MutexMap) cleanup() {
+ // Decr count
mm.count--
- if mm.count < 1 {
- // Perform all queued evictions
- for i := 0; i < len(mm.evict); i++ {
- mm.evict[i]()
+ if mm.count%mm.wake == 0 {
+ // Notify queued routines
+ for _, mu := range mm.queue {
+ mu.Unlock()
}
- // Notify all waiting goroutines
- for i := 0; i < len(mm.queue); i++ {
- mm.queue[i]()
+ // Reset queue
+ mm.queue = mm.queue[:0]
+ }
+
+ if mm.count < 1 {
+ // Perform evictions
+ for _, mu := range mm.evict {
+ key := mu.key
+ mu.key = ""
+ delete(mm.mumap, key)
+ mm.mpool.Release(mu)
}
- // Reset the map state
- mm.evict = nil
- mm.queue = nil
+ // Reset map state
+ mm.evict = mm.evict[:0]
mm.state = stateUnlockd
+ mm.mpool.GC()
+ mm.qpool.GC()
}
- // Finally, unlock
- mm.mapMu.Unlock()
+ // Unlock map
+ mm.mapmu.Unlock()
}
// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) RLockMap() *LockState {
- return mm.getMapLock(lockTypeRead)
+ mm.lockMap(lockTypeRead | lockTypeMap)
+ return &LockState{
+ mmap: mm,
+ ltyp: lockTypeRead,
+ }
}
// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) LockMap() *LockState {
- return mm.getMapLock(lockTypeWrite)
+ mm.lockMap(lockTypeWrite | lockTypeMap)
+ return &LockState{
+ mmap: mm,
+ ltyp: lockTypeWrite,
+ }
}
// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
func (mm *MutexMap) RLock(key string) (runlock func()) {
- return mm.getLock(key, lockTypeRead)
+ return mm.lock(key, lockTypeRead)
}
// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
func (mm *MutexMap) Lock(key string) (unlock func()) {
- return mm.getLock(key, lockTypeWrite)
-}
-
-// getLock will fetch lock of provided type, for given key, returning unlock function.
-func (mm *MutexMap) getLock(key string, lt locktype) func() {
- // Spin until achieve lock
- mm.spinLock(func() bool {
- return permitLockType(mm.state, lt) &&
- mm.count < mm.maxmu // not overloaded
- })
-
- // Perform actual mutex lock
- return mm.lockMutex(key, lt)
-}
-
-// getMapLock will acquire a map lock of provided type, returning a LockState session.
-func (mm *MutexMap) getMapLock(lt locktype) *LockState {
- // Spin until achieve lock
- mm.spinLock(func() bool {
- return permitLockType(mm.state, lt|lockTypeMap) &&
- mm.count < mm.maxmu // not overloaded
- })
-
- // Incr counter
- mm.count++
-
- switch {
- // Set read lock state
- case lt&lockTypeRead != 0:
- mm.state = stateRLocked
-
- // Set write lock state
- case lt&lockTypeWrite != 0:
- mm.state = stateLocked
-
- default:
- panic("unexpected lock type")
- }
-
- // Unlock + return
- mm.mapMu.Unlock()
- return &LockState{
- mmap: mm,
- ltyp: lt,
- }
+ return mm.lock(key, lockTypeWrite)
}
// LockState represents a window to a locked MutexMap.
@@ -267,56 +354,113 @@ type LockState struct {
wait sync.WaitGroup
mmap *MutexMap
done uint32
- ltyp locktype
+ ltyp uint8
}
// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
func (st *LockState) Lock(key string) (unlock func()) {
- return st.getLock(key, lockTypeWrite)
+ return st.lock(key, lockTypeWrite)
}
// RLock: see MutexMap.RLock() definition.
func (st *LockState) RLock(key string) (runlock func()) {
- return st.getLock(key, lockTypeRead)
+ return st.lock(key, lockTypeRead)
}
-// UnlockMap will close this state and release the currently locked map.
-func (st *LockState) UnlockMap() {
- // Set state to finished (or panic if already done)
- if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
- panic("called UnlockMap() on expired state")
- }
-
- // Wait until done
- st.wait.Wait()
-
- // Async reset map
- st.mmap.mapMu.Lock()
- go st.mmap.onUnlock()
-}
-
-// getLock: see MutexMap.getLock() definition.
-func (st *LockState) getLock(key string, lt locktype) func() {
+// lock: see MutexMap.lock() definition.
+func (st *LockState) lock(key string, lt uint8) func() {
st.wait.Add(1) // track lock
- // Check if closed, or if write lock is allowed
if atomic.LoadUint32(&st.done) == 1 {
- panic("map lock closed")
+ panic("called (r)lock on unlocked state")
} else if lt&lockTypeWrite != 0 &&
st.ltyp&lockTypeWrite == 0 {
- panic("called .Lock() on rlocked map")
+ panic("called lock on rlocked map")
}
- // Spin until achieve map lock
+ var ok bool
+ var mu *rwmutex
+
+ // Spin lock until returns true
st.mmap.spinLock(func() bool {
- return st.mmap.count < st.mmap.maxmu
- }) // i.e. not overloaded
+ // Check not overloaded
+ if !(st.mmap.count < st.mmap.maxmu) {
+ return false
+ }
+
+ // Ensure mutex at key
+ // is in lockable state
+ mu, ok = st.mmap.mumap[key]
+ return !ok || mu.CanLock(lt)
+ })
+
+ // Incr count
+ st.mmap.count++
+
+ if !ok {
+ // No mutex found for key
+
+ // Alloc from pool
+ v := st.mmap.mpool.Acquire()
+ mu = v.(*rwmutex)
+ st.mmap.mumap[key] = mu
+
+ // Set our key
+ mu.key = key
+
+ // Queue for eviction
+ st.mmap.evict = append(st.mmap.evict, mu)
+ }
- // Perform actual mutex lock
- unlock := st.mmap.lockMutex(key, lt)
+ // Lock mutex
+ mu.Lock(lt)
+
+ // Unlock map
+ st.mmap.mapmu.Unlock()
return func() {
- unlock()
- st.wait.Done()
+ st.mmap.mapmu.Lock()
+ mu.Unlock()
+ go st.mmap.cleanup()
+ st.wait.Add(-1)
+ }
+}
+
+// UnlockMap will close this state and release the currently locked map.
+func (st *LockState) UnlockMap() {
+ if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
+ panic("called unlockmap on expired state")
+ }
+ st.wait.Wait()
+ st.mmap.mapmu.Lock()
+ go st.mmap.cleanup()
+}
+
+// rwmutex is a very simple *representation* of a read-write
+// mutex, though not one in implementation. it works by
+// tracking the lock state for a given map key, which is
+// protected by the map's mutex.
+type rwmutex struct {
+ rcnt uint32
+ lock uint8
+ key string
+}
+
+func (mu *rwmutex) CanLock(lt uint8) bool {
+ return mu.lock == 0 ||
+ (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0)
+}
+
+func (mu *rwmutex) Lock(lt uint8) {
+ mu.lock = lt
+ if lt&lockTypeRead != 0 {
+ mu.rcnt++
+ }
+}
+
+func (mu *rwmutex) Unlock() {
+ mu.rcnt--
+ if mu.rcnt == 0 {
+ mu.lock = 0
}
}
diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex.go b/vendor/codeberg.org/gruf/go-mutexes/mutex.go
@@ -41,24 +41,24 @@ func WithFuncRW(mu RWMutex, onLock, onRLock, onUnlock, onRUnlock func()) RWMutex
}
// baseMutex simply wraps a sync.Mutex to implement our Mutex interface
-type baseMutex struct{ mu sync.Mutex }
+type baseMutex sync.Mutex
func (mu *baseMutex) Lock() func() {
- mu.mu.Lock()
- return mu.mu.Unlock
+ (*sync.Mutex)(mu).Lock()
+ return (*sync.Mutex)(mu).Unlock
}
// baseRWMutex simply wraps a sync.RWMutex to implement our RWMutex interface
-type baseRWMutex struct{ mu sync.RWMutex }
+type baseRWMutex sync.RWMutex
func (mu *baseRWMutex) Lock() func() {
- mu.mu.Lock()
- return mu.mu.Unlock
+ (*sync.RWMutex)(mu).Lock()
+ return (*sync.RWMutex)(mu).Unlock
}
func (mu *baseRWMutex) RLock() func() {
- mu.mu.RLock()
- return mu.mu.RUnlock
+ (*sync.RWMutex)(mu).RLock()
+ return (*sync.RWMutex)(mu).RUnlock
}
// fnMutex wraps a Mutex to add hooks for Lock and Unlock
diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go
@@ -1,6 +1,8 @@
package mutexes
-import "sync"
+import (
+ "sync/atomic"
+)
// WithSafety wrapps the supplied Mutex to protect unlock fns
// from being called multiple times
@@ -19,8 +21,7 @@ type safeMutex struct{ mu Mutex }
func (mu *safeMutex) Lock() func() {
unlock := mu.mu.Lock()
- once := sync.Once{}
- return func() { once.Do(unlock) }
+ return once(unlock)
}
// safeRWMutex simply wraps a RWMutex to add multi-unlock safety
@@ -28,12 +29,22 @@ type safeRWMutex struct{ mu RWMutex }
func (mu *safeRWMutex) Lock() func() {
unlock := mu.mu.Lock()
- once := sync.Once{}
- return func() { once.Do(unlock) }
+ return once(unlock)
}
func (mu *safeRWMutex) RLock() func() {
unlock := mu.mu.RLock()
- once := sync.Once{}
- return func() { once.Do(unlock) }
+ return once(unlock)
+}
+
+// once will perform 'do' only once, this is safe for unlocks
+// as 2 functions calling 'unlock()' don't need absolute guarantees
+// that by the time it is completed the unlock was finished.
+func once(do func()) func() {
+ var done uint32
+ return func() {
+ if atomic.CompareAndSwapUint32(&done, 0, 1) {
+ do()
+ }
+ }
}
diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go
@@ -97,7 +97,9 @@ func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {
// timerPool is the global &timer{} pool.
var timerPool = sync.Pool{
New: func() interface{} {
- return newtimer()
+ t := time.NewTimer(time.Minute)
+ t.Stop()
+ return &timer{t: t, c: make(chan struct{})}
},
}
@@ -107,13 +109,6 @@ type timer struct {
c chan struct{}
}
-// newtimer returns a new timer instance.
-func newtimer() *timer {
- t := time.NewTimer(time.Minute)
- t.Stop()
- return &timer{t: t, c: make(chan struct{})}
-}
-
// Start will start the timer with duration 'd', performing 'fn' on timeout.
func (t *timer) Start(d time.Duration, fn func()) {
t.t.Reset(d)
diff --git a/vendor/codeberg.org/gruf/go-mutexes/pool.go b/vendor/codeberg.org/gruf/go-mutexes/pool.go
@@ -0,0 +1,40 @@
+package mutexes
+
+// pool is a very simply memory pool.
+type pool struct {
+ current []interface{}
+ victim []interface{}
+ alloc func() interface{}
+}
+
+// Acquire will returns a sync.RWMutex from pool (or alloc new).
+func (p *pool) Acquire() interface{} {
+ // First try the current queue
+ if l := len(p.current) - 1; l >= 0 {
+ v := p.current[l]
+ p.current = p.current[:l]
+ return v
+ }
+
+ // Next try the victim queue.
+ if l := len(p.victim) - 1; l >= 0 {
+ v := p.victim[l]
+ p.victim = p.victim[:l]
+ return v
+ }
+
+ // Lastly, alloc new.
+ return p.alloc()
+}
+
+// Release places a sync.RWMutex back in the pool.
+func (p *pool) Release(v interface{}) {
+ p.current = append(p.current, v)
+}
+
+// GC will clear out unused entries from the pool.
+func (p *pool) GC() {
+ current := p.current
+ p.current = nil
+ p.victim = current
+}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go
@@ -45,7 +45,7 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Return new KVStore
return &KVStore{
- mutex: mutexes.NewMap(-1),
+ mutex: mutexes.NewMap(-1, -1),
storage: storage,
}, nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go
@@ -10,7 +10,7 @@ import (
"syscall"
"codeberg.org/gruf/go-bytes"
- "codeberg.org/gruf/go-pools"
+ "codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-store/util"
)
@@ -81,10 +81,10 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {
// DiskStorage is a Storage implementation that stores directly to a filesystem
type DiskStorage struct {
- path string // path is the root path of this store
- bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
- config DiskConfig // cfg is the supplied configuration for this store
- lock *Lock // lock is the opened lockfile for this storage instance
+ path string // path is the root path of this store
+ cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
+ config DiskConfig // cfg is the supplied configuration for this store
+ lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
@@ -147,13 +147,17 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
return nil, err
}
- // Return new DiskStorage
- return &DiskStorage{
+ // Prepare DiskStorage
+ st := &DiskStorage{
path: storePath,
- bufp: pools.NewBufferPool(config.WriteBufSize),
config: config,
lock: lock,
- }, nil
+ }
+
+ // Set copypool buffer size
+ st.cppool.Buffer(config.WriteBufSize)
+
+ return st, nil
}
// Clean implements Storage.Clean()
@@ -271,13 +275,8 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
}
defer cFile.Close()
- // Acquire write buffer
- buf := st.bufp.Get()
- defer st.bufp.Put(buf)
- buf.Grow(st.config.WriteBufSize)
-
- // Copy reader to file
- _, err = io.CopyBuffer(cFile, r, buf.B)
+ // Copy provided reader to file
+ _, err = st.cppool.Copy(cFile, r)
return err
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
@@ -4,6 +4,9 @@ codeberg.org/gruf/go-bytes
# codeberg.org/gruf/go-errors v1.0.5
## explicit; go 1.15
codeberg.org/gruf/go-errors
+# codeberg.org/gruf/go-fastcopy v1.1.1
+## explicit; go 1.17
+codeberg.org/gruf/go-fastcopy
# codeberg.org/gruf/go-fastpath v1.0.2
## explicit; go 1.14
codeberg.org/gruf/go-fastpath
@@ -13,7 +16,7 @@ codeberg.org/gruf/go-format
# codeberg.org/gruf/go-hashenc v1.0.1
## explicit; go 1.16
codeberg.org/gruf/go-hashenc
-# codeberg.org/gruf/go-mutexes v1.1.0
+# codeberg.org/gruf/go-mutexes v1.1.2
## explicit; go 1.14
codeberg.org/gruf/go-mutexes
# codeberg.org/gruf/go-pools v1.0.2
@@ -22,7 +25,7 @@ codeberg.org/gruf/go-pools
# codeberg.org/gruf/go-runners v1.2.0
## explicit; go 1.14
codeberg.org/gruf/go-runners
-# codeberg.org/gruf/go-store v1.3.3
+# codeberg.org/gruf/go-store v1.3.6
## explicit; go 1.14
codeberg.org/gruf/go-store/kv
codeberg.org/gruf/go-store/storage