diff --git a/go.mod b/go.mod index 72744f2e1..e83d3966c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( codeberg.org/gruf/go-errors v1.0.5 codeberg.org/gruf/go-runners v1.2.0 - codeberg.org/gruf/go-store v1.3.3 + codeberg.org/gruf/go-store v1.3.6 github.com/ReneKroon/ttlcache v1.7.0 github.com/buckket/go-blurhash v1.1.0 github.com/coreos/go-oidc/v3 v3.1.0 @@ -49,10 +49,11 @@ require ( require ( codeberg.org/gruf/go-bytes v1.0.2 // indirect + codeberg.org/gruf/go-fastcopy v1.1.1 // indirect codeberg.org/gruf/go-fastpath v1.0.2 // indirect codeberg.org/gruf/go-format v1.0.3 // indirect codeberg.org/gruf/go-hashenc v1.0.1 // indirect - codeberg.org/gruf/go-mutexes v1.1.0 // indirect + codeberg.org/gruf/go-mutexes v1.1.2 // indirect codeberg.org/gruf/go-pools v1.0.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 4e92adf10..700c18a9d 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ codeberg.org/gruf/go-bytes v1.0.2/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9 codeberg.org/gruf/go-cache v1.1.2/go.mod h1:/Dbc+xU72Op3hMn6x2PXF3NE9uIDFeS+sXPF00hN/7o= codeberg.org/gruf/go-errors v1.0.5 h1:rxV70oQkfasUdggLHxOX2QAoJOMFM7XWxHQR45Zx/Fg= codeberg.org/gruf/go-errors v1.0.5/go.mod h1:n03EpmvcmfzU3/xJKC0XXtleXXJUNFpT2fgISODvZ1Y= +codeberg.org/gruf/go-fastcopy v1.1.1 h1:HhPCeFdVR5pwiSVDnQEGJ+J2ny9b5QgfiESc0zrWQAY= +codeberg.org/gruf/go-fastcopy v1.1.1/go.mod h1:GDDYR0Cnb3U/AIfGM3983V/L+GN+vuwVMvrmVABo21s= codeberg.org/gruf/go-fastpath v1.0.1/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI= codeberg.org/gruf/go-fastpath v1.0.2 h1:O3nuYPMXnN89dsgAwVFU5iCGINtPJdITWmbRe2an/iQ= codeberg.org/gruf/go-fastpath v1.0.2/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI= @@ -60,8 +62,8 @@ codeberg.org/gruf/go-format v1.0.3 h1:WoUGzTwZe6SIhILNvtr0qNIA7BOOCgdBlk5bUrfeii codeberg.org/gruf/go-format v1.0.3/go.mod h1:k3TLXp1dqAXdDqxlon0yEM+3FFHdNn0D6BVJTwTy5As= codeberg.org/gruf/go-hashenc v1.0.1 h1:EBvNe2wW8IPMUqT1XihB6/IM6KMJDLMFBxIUvmsy1f8= codeberg.org/gruf/go-hashenc v1.0.1/go.mod h1:IfHhPCVScOiYmJLqdCQT9bYVS1nxNTV4ewMUvFWDPtc= -codeberg.org/gruf/go-mutexes v1.1.0 h1:kMVWHLxdfGEZTetNVRncdBMeqS4M8dSJxSGbRYXyvKk= -codeberg.org/gruf/go-mutexes v1.1.0/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8= +codeberg.org/gruf/go-mutexes v1.1.2 h1:AMC1CFV6kMi+iBjR3yQv8yIagG3lWm68U6sQHYFHEf4= +codeberg.org/gruf/go-mutexes v1.1.2/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8= codeberg.org/gruf/go-nowish v1.0.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s= codeberg.org/gruf/go-nowish v1.1.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s= codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos= @@ -69,8 +71,8 @@ codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4 codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw= codeberg.org/gruf/go-runners v1.2.0 h1:tkoPrwYMkVg1o/C4PGTR1YbC11XX4r06uLPOYajBsH4= codeberg.org/gruf/go-runners v1.2.0/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw= -codeberg.org/gruf/go-store v1.3.3 h1:fAP9FXy6HiLPxdD7cmpSzyfKXmVvZLjqn0m7HhxVT5M= -codeberg.org/gruf/go-store v1.3.3/go.mod h1:g4+9h3wbwZ6IW0uhpw57xywcqiy4CIj0zQLqqtjEU1M= +codeberg.org/gruf/go-store v1.3.6 h1:OKzdvfUC+nvsWV5FiSKdk+85yvxF2Tb7K5ZtRqlDBDU= +codeberg.org/gruf/go-store v1.3.6/go.mod h1:a4vJtZf61UyrsejskX8q+s0lZeNGy7cJLUZt+fH00wo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/vendor/codeberg.org/gruf/go-fastcopy/README.md b/vendor/codeberg.org/gruf/go-fastcopy/README.md new file mode 100644 index 000000000..0c1ff68f7 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-fastcopy/README.md @@ -0,0 +1,3 @@ +# go-fastcopy + +An `io.Copy()` implementation that uses a memory pool for the copy buffer. \ No newline at end of file diff --git a/vendor/codeberg.org/gruf/go-fastcopy/copy.go b/vendor/codeberg.org/gruf/go-fastcopy/copy.go new file mode 100644 index 000000000..4716b140f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-fastcopy/copy.go @@ -0,0 +1,134 @@ +package fastcopy + +import ( + "io" + "sync" + _ "unsafe" // link to io.errInvalidWrite. +) + +var ( + // global pool instance. + pool = CopyPool{size: 4096} + + //go:linkname errInvalidWrite io.errInvalidWrite + errInvalidWrite error +) + +// CopyPool provides a memory pool of byte +// buffers for io copies from readers to writers. +type CopyPool struct { + size int + pool sync.Pool +} + +// See CopyPool.Buffer(). +func Buffer(sz int) int { + return pool.Buffer(sz) +} + +// See CopyPool.CopyN(). +func CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) { + return pool.CopyN(dst, src, n) +} + +// See CopyPool.Copy(). +func Copy(dst io.Writer, src io.Reader) (int64, error) { + return pool.Copy(dst, src) +} + +// Buffer sets the pool buffer size to allocate. Returns current size. +// Note this is NOT atomically safe, please call BEFORE other calls to CopyPool. +func (cp *CopyPool) Buffer(sz int) int { + if sz > 0 { + // update size + cp.size = sz + } else if cp.size < 1 { + // default size + return 4096 + } + return cp.size +} + +// CopyN performs the same logic as io.CopyN(), with the difference +// being that the byte buffer is acquired from a memory pool. +func (cp *CopyPool) CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) { + written, err := cp.Copy(dst, io.LimitReader(src, n)) + if written == n { + return n, nil + } + if written < n && err == nil { + // src stopped early; must have been EOF. + err = io.EOF + } + return written, err +} + +// Copy performs the same logic as io.Copy(), with the difference +// being that the byte buffer is acquired from a memory pool. +func (cp *CopyPool) Copy(dst io.Writer, src io.Reader) (int64, error) { + // Prefer using io.WriterTo to do the copy (avoids alloc + copy) + if wt, ok := src.(io.WriterTo); ok { + return wt.WriteTo(dst) + } + + // Prefer using io.ReaderFrom to do the copy. + if rt, ok := dst.(io.ReaderFrom); ok { + return rt.ReadFrom(src) + } + + var buf []byte + + if b, ok := cp.pool.Get().([]byte); ok { + // Acquired buf from pool + buf = b + } else { + // Allocate new buffer of size + buf = make([]byte, cp.Buffer(0)) + } + + // Defer release to pool + defer cp.pool.Put(buf) + + var n int64 + for { + // Perform next read into buf + nr, err := src.Read(buf) + if nr > 0 { + // We error check AFTER checking + // no. read bytes so incomplete + // read still gets written up to nr. + + // Perform next write from buf + nw, ew := dst.Write(buf[0:nr]) + + // Check for valid write + if nw < 0 || nr < nw { + if ew == nil { + ew = errInvalidWrite + } + return n, ew + } + + // Incr total count + n += int64(nw) + + // Check write error + if ew != nil { + return n, ew + } + + // Check unequal read/writes + if nr != nw { + return n, io.ErrShortWrite + } + } + + // Return on err + if err != nil { + if err == io.EOF { + err = nil // expected + } + return n, err + } + } +} diff --git a/vendor/codeberg.org/gruf/go-mutexes/debug.go b/vendor/codeberg.org/gruf/go-mutexes/debug.go new file mode 100644 index 000000000..1b7be60c7 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-mutexes/debug.go @@ -0,0 +1,39 @@ +package mutexes + +// func init() { +// log.SetFlags(log.Flags() | log.Lshortfile) +// } + +// type debugMutex sync.Mutex + +// func (mu *debugMutex) Lock() { +// log.Output(2, "Lock()") +// (*sync.Mutex)(mu).Lock() +// } + +// func (mu *debugMutex) Unlock() { +// log.Output(2, "Unlock()") +// (*sync.Mutex)(mu).Unlock() +// } + +// type debugRWMutex sync.RWMutex + +// func (mu *debugRWMutex) Lock() { +// log.Output(2, "Lock()") +// (*sync.RWMutex)(mu).Lock() +// } + +// func (mu *debugRWMutex) Unlock() { +// log.Output(2, "Unlock()") +// (*sync.RWMutex)(mu).Unlock() +// } + +// func (mu *debugRWMutex) RLock() { +// log.Output(2, "RLock()") +// (*sync.RWMutex)(mu).RLock() +// } + +// func (mu *debugRWMutex) RUnlock() { +// log.Output(2, "RUnlock()") +// (*sync.RWMutex)(mu).RUnlock() +// } diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index cb31a9543..c0f740eec 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -6,260 +6,347 @@ "sync/atomic" ) -// locktype defines maskable mutexmap lock types. -type locktype uint8 - const ( // possible lock types. - lockTypeRead = locktype(1) << 0 - lockTypeWrite = locktype(1) << 1 - lockTypeMap = locktype(1) << 2 + lockTypeRead = uint8(1) << 0 + lockTypeWrite = uint8(1) << 1 + lockTypeMap = uint8(1) << 2 // possible mutexmap states. stateUnlockd = uint8(0) stateRLocked = uint8(1) stateLocked = uint8(2) stateInUse = uint8(3) + + // default values. + defaultWake = 1024 ) -// permitLockType returns if provided locktype is permitted to go ahead in current state. -func permitLockType(state uint8, lt locktype) bool { +// acquireState attempts to acquire required map state for lockType. +func acquireState(state uint8, lt uint8) (uint8, bool) { switch state { // Unlocked state // (all allowed) case stateUnlockd: - return true // Keys locked, no state lock. // (don't allow map locks) case stateInUse: - return lt&lockTypeMap == 0 + if lt&lockTypeMap != 0 { + return 0, false + } // Read locked // (only allow read locks) case stateRLocked: - return lt&lockTypeRead != 0 + if lt&lockTypeRead == 0 { + return 0, false + } // Write locked // (none allowed) case stateLocked: - return false + return 0, false // shouldn't reach here default: panic("unexpected state") } -} - -// MutexMap is a structure that allows having a map of self-evicting mutexes -// by key. You do not need to worry about managing the contents of the map, -// only requesting RLock/Lock for keys, and ensuring to call the returned -// unlock functions. -type MutexMap struct { - mus map[string]RWMutex - mapMu sync.Mutex - pool sync.Pool - queue []func() - evict []func() - count int32 - maxmu int32 - state uint8 -} - -// NewMap returns a new MutexMap instance with provided max no. open mutexes. -func NewMap(max int32) MutexMap { - if max < 1 { - // Default = 128 * GOMAXPROCS - procs := runtime.GOMAXPROCS(0) - max = int32(procs * 128) - } - return MutexMap{ - mus: make(map[string]RWMutex), - pool: sync.Pool{ - New: func() interface{} { - return NewRW() - }, - }, - maxmu: max, - } -} - -// acquire will either acquire a mutex from pool or alloc. -func (mm *MutexMap) acquire() RWMutex { - return mm.pool.Get().(RWMutex) -} - -// release will release provided mutex to pool. -func (mm *MutexMap) release(mu RWMutex) { - mm.pool.Put(mu) -} - -// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true, -// returning with map lock. Note that 'cond' is performed within a map lock. -func (mm *MutexMap) spinLock(cond func() bool) { - mu := mm.acquire() - defer mm.release(mu) - - for { - // Get map lock - mm.mapMu.Lock() - - // Check if return - if cond() { - return - } - - // Queue ourselves - unlock := mu.Lock() - mm.queue = append(mm.queue, unlock) - mm.mapMu.Unlock() - - // Wait on notify - mu.Lock()() - } -} - -// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return. -func (mm *MutexMap) lockMutex(key string, lt locktype) func() { - var unlock func() - - // Incr counter - mm.count++ - - // Check for existing mutex at key - mu, ok := mm.mus[key] - if !ok { - // Alloc from pool - mu = mm.acquire() - mm.mus[key] = mu - - // Queue mutex for eviction - mm.evict = append(mm.evict, func() { - delete(mm.mus, key) - mm.pool.Put(mu) - }) - } - - // If no state, set in use. - // State will already have been - // set if this is from LockState{} - if mm.state == stateUnlockd { - mm.state = stateInUse - } switch { - // Read lock + // If unlocked and not a map + // lock request, set in use + case lt&lockTypeMap == 0: + if state == stateUnlockd { + state = stateInUse + } + + // Set read lock state case lt&lockTypeRead != 0: - unlock = mu.RLock() + state = stateRLocked - // Write lock + // Set write lock state case lt&lockTypeWrite != 0: - unlock = mu.Lock() + state = stateLocked - // shouldn't reach here default: panic("unexpected lock type") } - // Unlock map + return - mm.mapMu.Unlock() - return func() { - mm.mapMu.Lock() - unlock() - go mm.onUnlock() + return state, true +} + +// MutexMap is a structure that allows read / write locking key, performing +// as you'd expect a map[string]*sync.RWMutex to perform. The differences +// being that the entire map can itself be read / write locked, it uses memory +// pooling for the mutex (not quite) structures, and it is self-evicting. The +// core configurations of maximum no. open locks and wake modulus* are user +// definable. +// +// * The wake modulus is the number that the current number of open locks is +// modulused against to determine how often to notify sleeping goroutines. +// These are goroutines that are attempting to lock a key / whole map and are +// awaiting a permissible state (.e.g no key write locks allowed when the +// map is read locked). +type MutexMap struct { + qpool pool + queue []*sync.Mutex + + mumap map[string]*rwmutex + mpool pool + evict []*rwmutex + + count int32 + maxmu int32 + wake int32 + + mapmu sync.Mutex + state uint8 +} + +// NewMap returns a new MutexMap instance with provided max no. open mutexes. +func NewMap(max, wake int32) MutexMap { + // Determine wake mod. + if wake < 1 { + wake = defaultWake + } + + // Determine max no. mutexes + if max < 1 { + procs := runtime.GOMAXPROCS(0) + max = wake * int32(procs) + } + + return MutexMap{ + qpool: pool{ + alloc: func() interface{} { + return &sync.Mutex{} + }, + }, + mumap: make(map[string]*rwmutex, max), + mpool: pool{ + alloc: func() interface{} { + return &rwmutex{} + }, + }, + maxmu: max, + wake: wake, } } -// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex. -func (mm *MutexMap) onUnlock() { - // Decr counter - mm.count-- +// MAX sets the MutexMap max open locks and wake modulus, returns current values. +// For values less than zero defaults are set, and zero is non-op. +func (mm *MutexMap) SET(max, wake int32) (int32, int32) { + mm.mapmu.Lock() - if mm.count < 1 { - // Perform all queued evictions - for i := 0; i < len(mm.evict); i++ { - mm.evict[i]() - } + switch { + // Set default wake + case wake < 0: + mm.wake = defaultWake - // Notify all waiting goroutines - for i := 0; i < len(mm.queue); i++ { - mm.queue[i]() - } - - // Reset the map state - mm.evict = nil - mm.queue = nil - mm.state = stateUnlockd + // Set supplied wake + case wake > 0: + mm.wake = wake } - // Finally, unlock - mm.mapMu.Unlock() + switch { + // Set default max + case max < 0: + procs := runtime.GOMAXPROCS(0) + mm.maxmu = wake * int32(procs) + + // Set supplied max + case max > 0: + mm.maxmu = max + } + + // Fetch values + max = mm.maxmu + wake = mm.wake + + mm.mapmu.Unlock() + return max, wake +} + +// spinLock will wait (using a mutex to sleep thread) until conditional returns true. +func (mm *MutexMap) spinLock(cond func() bool) { + var mu *sync.Mutex + + for { + // Acquire map lock + mm.mapmu.Lock() + + if cond() { + // Release mu if needed + if mu != nil { + mm.qpool.Release(mu) + } + return + } + + // Alloc mu if needed + if mu == nil { + v := mm.qpool.Acquire() + mu = v.(*sync.Mutex) + } + + // Queue ourselves + mm.queue = append(mm.queue, mu) + mu.Lock() + + // Unlock map + mm.mapmu.Unlock() + + // Wait on notify + mu.Lock() + mu.Unlock() + } +} + +// lock will acquire a lock of given type on the 'mutex' at key. +func (mm *MutexMap) lock(key string, lt uint8) func() { + var ok bool + var mu *rwmutex + + // Spin lock until returns true + mm.spinLock(func() bool { + // Check not overloaded + if !(mm.count < mm.maxmu) { + return false + } + + // Attempt to acquire usable map state + state, ok := acquireState(mm.state, lt) + if !ok { + return false + } + + // Update state + mm.state = state + + // Ensure mutex at key + // is in lockable state + mu, ok = mm.mumap[key] + return !ok || mu.CanLock(lt) + }) + + // Incr count + mm.count++ + + if !ok { + // No mutex found for key + + // Alloc from pool + v := mm.mpool.Acquire() + mu = v.(*rwmutex) + mm.mumap[key] = mu + + // Set our key + mu.key = key + + // Queue for eviction + mm.evict = append(mm.evict, mu) + } + + // Lock mutex + mu.Lock(lt) + + // Unlock map + mm.mapmu.Unlock() + + return func() { + mm.mapmu.Lock() + mu.Unlock() + go mm.cleanup() + } +} + +// lockMap will lock the whole map under given lock type. +func (mm *MutexMap) lockMap(lt uint8) { + // Spin lock until returns true + mm.spinLock(func() bool { + // Attempt to acquire usable map state + state, ok := acquireState(mm.state, lt) + if !ok { + return false + } + + // Update state + mm.state = state + + return true + }) + + // Incr count + mm.count++ + + // State acquired, unlock + mm.mapmu.Unlock() +} + +// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. +func (mm *MutexMap) cleanup() { + // Decr count + mm.count-- + + if mm.count%mm.wake == 0 { + // Notify queued routines + for _, mu := range mm.queue { + mu.Unlock() + } + + // Reset queue + mm.queue = mm.queue[:0] + } + + if mm.count < 1 { + // Perform evictions + for _, mu := range mm.evict { + key := mu.key + mu.key = "" + delete(mm.mumap, key) + mm.mpool.Release(mu) + } + + // Reset map state + mm.evict = mm.evict[:0] + mm.state = stateUnlockd + mm.mpool.GC() + mm.qpool.GC() + } + + // Unlock map + mm.mapmu.Unlock() } // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. func (mm *MutexMap) RLockMap() *LockState { - return mm.getMapLock(lockTypeRead) + mm.lockMap(lockTypeRead | lockTypeMap) + return &LockState{ + mmap: mm, + ltyp: lockTypeRead, + } } // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. func (mm *MutexMap) LockMap() *LockState { - return mm.getMapLock(lockTypeWrite) + mm.lockMap(lockTypeWrite | lockTypeMap) + return &LockState{ + mmap: mm, + ltyp: lockTypeWrite, + } } // RLock acquires a mutex read lock for supplied key, returning an RUnlock function. func (mm *MutexMap) RLock(key string) (runlock func()) { - return mm.getLock(key, lockTypeRead) + return mm.lock(key, lockTypeRead) } // Lock acquires a mutex write lock for supplied key, returning an Unlock function. func (mm *MutexMap) Lock(key string) (unlock func()) { - return mm.getLock(key, lockTypeWrite) -} - -// getLock will fetch lock of provided type, for given key, returning unlock function. -func (mm *MutexMap) getLock(key string, lt locktype) func() { - // Spin until achieve lock - mm.spinLock(func() bool { - return permitLockType(mm.state, lt) && - mm.count < mm.maxmu // not overloaded - }) - - // Perform actual mutex lock - return mm.lockMutex(key, lt) -} - -// getMapLock will acquire a map lock of provided type, returning a LockState session. -func (mm *MutexMap) getMapLock(lt locktype) *LockState { - // Spin until achieve lock - mm.spinLock(func() bool { - return permitLockType(mm.state, lt|lockTypeMap) && - mm.count < mm.maxmu // not overloaded - }) - - // Incr counter - mm.count++ - - switch { - // Set read lock state - case lt&lockTypeRead != 0: - mm.state = stateRLocked - - // Set write lock state - case lt&lockTypeWrite != 0: - mm.state = stateLocked - - default: - panic("unexpected lock type") - } - - // Unlock + return - mm.mapMu.Unlock() - return &LockState{ - mmap: mm, - ltyp: lt, - } + return mm.lock(key, lockTypeWrite) } // LockState represents a window to a locked MutexMap. @@ -267,56 +354,113 @@ type LockState struct { wait sync.WaitGroup mmap *MutexMap done uint32 - ltyp locktype + ltyp uint8 } // Lock: see MutexMap.Lock() definition. Will panic if map only read locked. func (st *LockState) Lock(key string) (unlock func()) { - return st.getLock(key, lockTypeWrite) + return st.lock(key, lockTypeWrite) } // RLock: see MutexMap.RLock() definition. func (st *LockState) RLock(key string) (runlock func()) { - return st.getLock(key, lockTypeRead) + return st.lock(key, lockTypeRead) +} + +// lock: see MutexMap.lock() definition. +func (st *LockState) lock(key string, lt uint8) func() { + st.wait.Add(1) // track lock + + if atomic.LoadUint32(&st.done) == 1 { + panic("called (r)lock on unlocked state") + } else if lt&lockTypeWrite != 0 && + st.ltyp&lockTypeWrite == 0 { + panic("called lock on rlocked map") + } + + var ok bool + var mu *rwmutex + + // Spin lock until returns true + st.mmap.spinLock(func() bool { + // Check not overloaded + if !(st.mmap.count < st.mmap.maxmu) { + return false + } + + // Ensure mutex at key + // is in lockable state + mu, ok = st.mmap.mumap[key] + return !ok || mu.CanLock(lt) + }) + + // Incr count + st.mmap.count++ + + if !ok { + // No mutex found for key + + // Alloc from pool + v := st.mmap.mpool.Acquire() + mu = v.(*rwmutex) + st.mmap.mumap[key] = mu + + // Set our key + mu.key = key + + // Queue for eviction + st.mmap.evict = append(st.mmap.evict, mu) + } + + // Lock mutex + mu.Lock(lt) + + // Unlock map + st.mmap.mapmu.Unlock() + + return func() { + st.mmap.mapmu.Lock() + mu.Unlock() + go st.mmap.cleanup() + st.wait.Add(-1) + } } // UnlockMap will close this state and release the currently locked map. func (st *LockState) UnlockMap() { - // Set state to finished (or panic if already done) if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { - panic("called UnlockMap() on expired state") + panic("called unlockmap on expired state") } - - // Wait until done st.wait.Wait() - - // Async reset map - st.mmap.mapMu.Lock() - go st.mmap.onUnlock() + st.mmap.mapmu.Lock() + go st.mmap.cleanup() } -// getLock: see MutexMap.getLock() definition. -func (st *LockState) getLock(key string, lt locktype) func() { - st.wait.Add(1) // track lock +// rwmutex is a very simple *representation* of a read-write +// mutex, though not one in implementation. it works by +// tracking the lock state for a given map key, which is +// protected by the map's mutex. +type rwmutex struct { + rcnt uint32 + lock uint8 + key string +} - // Check if closed, or if write lock is allowed - if atomic.LoadUint32(&st.done) == 1 { - panic("map lock closed") - } else if lt&lockTypeWrite != 0 && - st.ltyp&lockTypeWrite == 0 { - panic("called .Lock() on rlocked map") - } +func (mu *rwmutex) CanLock(lt uint8) bool { + return mu.lock == 0 || + (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) +} - // Spin until achieve map lock - st.mmap.spinLock(func() bool { - return st.mmap.count < st.mmap.maxmu - }) // i.e. not overloaded - - // Perform actual mutex lock - unlock := st.mmap.lockMutex(key, lt) - - return func() { - unlock() - st.wait.Done() +func (mu *rwmutex) Lock(lt uint8) { + mu.lock = lt + if lt&lockTypeRead != 0 { + mu.rcnt++ + } +} + +func (mu *rwmutex) Unlock() { + mu.rcnt-- + if mu.rcnt == 0 { + mu.lock = 0 } } diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex.go b/vendor/codeberg.org/gruf/go-mutexes/mutex.go index c4f3f8876..3841c9423 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex.go @@ -41,24 +41,24 @@ func WithFuncRW(mu RWMutex, onLock, onRLock, onUnlock, onRUnlock func()) RWMutex } // baseMutex simply wraps a sync.Mutex to implement our Mutex interface -type baseMutex struct{ mu sync.Mutex } +type baseMutex sync.Mutex func (mu *baseMutex) Lock() func() { - mu.mu.Lock() - return mu.mu.Unlock + (*sync.Mutex)(mu).Lock() + return (*sync.Mutex)(mu).Unlock } // baseRWMutex simply wraps a sync.RWMutex to implement our RWMutex interface -type baseRWMutex struct{ mu sync.RWMutex } +type baseRWMutex sync.RWMutex func (mu *baseRWMutex) Lock() func() { - mu.mu.Lock() - return mu.mu.Unlock + (*sync.RWMutex)(mu).Lock() + return (*sync.RWMutex)(mu).Unlock } func (mu *baseRWMutex) RLock() func() { - mu.mu.RLock() - return mu.mu.RUnlock + (*sync.RWMutex)(mu).RLock() + return (*sync.RWMutex)(mu).RUnlock } // fnMutex wraps a Mutex to add hooks for Lock and Unlock diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go index 7a9747521..5a0383dce 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go @@ -1,6 +1,8 @@ package mutexes -import "sync" +import ( + "sync/atomic" +) // WithSafety wrapps the supplied Mutex to protect unlock fns // from being called multiple times @@ -19,8 +21,7 @@ type safeMutex struct{ func (mu *safeMutex) Lock() func() { unlock := mu.mu.Lock() - once := sync.Once{} - return func() { once.Do(unlock) } + return once(unlock) } // safeRWMutex simply wraps a RWMutex to add multi-unlock safety @@ -28,12 +29,22 @@ type safeRWMutex struct{ func (mu *safeRWMutex) Lock() func() { unlock := mu.mu.Lock() - once := sync.Once{} - return func() { once.Do(unlock) } + return once(unlock) } func (mu *safeRWMutex) RLock() func() { unlock := mu.mu.RLock() - once := sync.Once{} - return func() { once.Do(unlock) } + return once(unlock) +} + +// once will perform 'do' only once, this is safe for unlocks +// as 2 functions calling 'unlock()' don't need absolute guarantees +// that by the time it is completed the unlock was finished. +func once(do func()) func() { + var done uint32 + return func() { + if atomic.CompareAndSwapUint32(&done, 0, 1) { + do() + } + } } diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go index 2e7b8f802..03bf0e389 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go @@ -97,7 +97,9 @@ func mutexTimeout(d time.Duration, unlock func(), fn func()) func() { // timerPool is the global &timer{} pool. var timerPool = sync.Pool{ New: func() interface{} { - return newtimer() + t := time.NewTimer(time.Minute) + t.Stop() + return &timer{t: t, c: make(chan struct{})} }, } @@ -107,13 +109,6 @@ type timer struct { c chan struct{} } -// newtimer returns a new timer instance. -func newtimer() *timer { - t := time.NewTimer(time.Minute) - t.Stop() - return &timer{t: t, c: make(chan struct{})} -} - // Start will start the timer with duration 'd', performing 'fn' on timeout. func (t *timer) Start(d time.Duration, fn func()) { t.t.Reset(d) diff --git a/vendor/codeberg.org/gruf/go-mutexes/pool.go b/vendor/codeberg.org/gruf/go-mutexes/pool.go new file mode 100644 index 000000000..135e2c117 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-mutexes/pool.go @@ -0,0 +1,40 @@ +package mutexes + +// pool is a very simply memory pool. +type pool struct { + current []interface{} + victim []interface{} + alloc func() interface{} +} + +// Acquire will returns a sync.RWMutex from pool (or alloc new). +func (p *pool) Acquire() interface{} { + // First try the current queue + if l := len(p.current) - 1; l >= 0 { + v := p.current[l] + p.current = p.current[:l] + return v + } + + // Next try the victim queue. + if l := len(p.victim) - 1; l >= 0 { + v := p.victim[l] + p.victim = p.victim[:l] + return v + } + + // Lastly, alloc new. + return p.alloc() +} + +// Release places a sync.RWMutex back in the pool. +func (p *pool) Release(v interface{}) { + p.current = append(p.current, v) +} + +// GC will clear out unused entries from the pool. +func (p *pool) GC() { + current := p.current + p.current = nil + p.victim = current +} diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index a8741afe0..fd9935f25 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -45,7 +45,7 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) { // Return new KVStore return &KVStore{ - mutex: mutexes.NewMap(-1), + mutex: mutexes.NewMap(-1, -1), storage: storage, }, nil } diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go index 287042886..b3c480b3d 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -10,7 +10,7 @@ "syscall" "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-pools" + "codeberg.org/gruf/go-fastcopy" "codeberg.org/gruf/go-store/util" ) @@ -81,10 +81,10 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig { // DiskStorage is a Storage implementation that stores directly to a filesystem type DiskStorage struct { - path string // path is the root path of this store - bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage - config DiskConfig // cfg is the supplied configuration for this store - lock *Lock // lock is the opened lockfile for this storage instance + path string // path is the root path of this store + cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool + config DiskConfig // cfg is the supplied configuration for this store + lock *Lock // lock is the opened lockfile for this storage instance } // OpenFile opens a DiskStorage instance for given folder path and configuration @@ -147,13 +147,17 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { return nil, err } - // Return new DiskStorage - return &DiskStorage{ + // Prepare DiskStorage + st := &DiskStorage{ path: storePath, - bufp: pools.NewBufferPool(config.WriteBufSize), config: config, lock: lock, - }, nil + } + + // Set copypool buffer size + st.cppool.Buffer(config.WriteBufSize) + + return st, nil } // Clean implements Storage.Clean() @@ -271,13 +275,8 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error { } defer cFile.Close() - // Acquire write buffer - buf := st.bufp.Get() - defer st.bufp.Put(buf) - buf.Grow(st.config.WriteBufSize) - - // Copy reader to file - _, err = io.CopyBuffer(cFile, r, buf.B) + // Copy provided reader to file + _, err = st.cppool.Copy(cFile, r) return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3c273e88c..c28759538 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -4,6 +4,9 @@ codeberg.org/gruf/go-bytes # codeberg.org/gruf/go-errors v1.0.5 ## explicit; go 1.15 codeberg.org/gruf/go-errors +# codeberg.org/gruf/go-fastcopy v1.1.1 +## explicit; go 1.17 +codeberg.org/gruf/go-fastcopy # codeberg.org/gruf/go-fastpath v1.0.2 ## explicit; go 1.14 codeberg.org/gruf/go-fastpath @@ -13,7 +16,7 @@ codeberg.org/gruf/go-format # codeberg.org/gruf/go-hashenc v1.0.1 ## explicit; go 1.16 codeberg.org/gruf/go-hashenc -# codeberg.org/gruf/go-mutexes v1.1.0 +# codeberg.org/gruf/go-mutexes v1.1.2 ## explicit; go 1.14 codeberg.org/gruf/go-mutexes # codeberg.org/gruf/go-pools v1.0.2 @@ -22,7 +25,7 @@ codeberg.org/gruf/go-pools # codeberg.org/gruf/go-runners v1.2.0 ## explicit; go 1.14 codeberg.org/gruf/go-runners -# codeberg.org/gruf/go-store v1.3.3 +# codeberg.org/gruf/go-store v1.3.6 ## explicit; go 1.14 codeberg.org/gruf/go-store/kv codeberg.org/gruf/go-store/storage