mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-02-04 20:05:05 +00:00
93 lines
2.4 KiB
Go
93 lines
2.4 KiB
Go
package xsync
|
|
|
|
import (
|
|
"sync/atomic"
|
|
)
|
|
|
|
// A SPSCQueue is a bounded single-producer single-consumer concurrent
|
|
// queue. This means that not more than a single goroutine must be
|
|
// publishing items to the queue while not more than a single goroutine
|
|
// must be consuming those items.
|
|
//
|
|
// SPSCQueue instances must be created with NewSPSCQueue function.
|
|
// A SPSCQueue must not be copied after first use.
|
|
//
|
|
// Based on the data structure from the following article:
|
|
// https://rigtorp.se/ringbuffer/
|
|
type SPSCQueue struct {
|
|
cap uint64
|
|
pidx uint64
|
|
//lint:ignore U1000 prevents false sharing
|
|
pad0 [cacheLineSize - 8]byte
|
|
pcachedIdx uint64
|
|
//lint:ignore U1000 prevents false sharing
|
|
pad1 [cacheLineSize - 8]byte
|
|
cidx uint64
|
|
//lint:ignore U1000 prevents false sharing
|
|
pad2 [cacheLineSize - 8]byte
|
|
ccachedIdx uint64
|
|
//lint:ignore U1000 prevents false sharing
|
|
pad3 [cacheLineSize - 8]byte
|
|
items []interface{}
|
|
}
|
|
|
|
// NewSPSCQueue creates a new SPSCQueue instance with the given
|
|
// capacity.
|
|
func NewSPSCQueue(capacity int) *SPSCQueue {
|
|
if capacity < 1 {
|
|
panic("capacity must be positive number")
|
|
}
|
|
return &SPSCQueue{
|
|
cap: uint64(capacity + 1),
|
|
items: make([]interface{}, capacity+1),
|
|
}
|
|
}
|
|
|
|
// TryEnqueue inserts the given item into the queue. Does not block
|
|
// and returns immediately. The result indicates that the queue isn't
|
|
// full and the item was inserted.
|
|
func (q *SPSCQueue) TryEnqueue(item interface{}) bool {
|
|
// relaxed memory order would be enough here
|
|
idx := atomic.LoadUint64(&q.pidx)
|
|
nextIdx := idx + 1
|
|
if nextIdx == q.cap {
|
|
nextIdx = 0
|
|
}
|
|
cachedIdx := q.ccachedIdx
|
|
if nextIdx == cachedIdx {
|
|
cachedIdx = atomic.LoadUint64(&q.cidx)
|
|
q.ccachedIdx = cachedIdx
|
|
if nextIdx == cachedIdx {
|
|
return false
|
|
}
|
|
}
|
|
q.items[idx] = item
|
|
atomic.StoreUint64(&q.pidx, nextIdx)
|
|
return true
|
|
}
|
|
|
|
// TryDequeue retrieves and removes the item from the head of the
|
|
// queue. Does not block and returns immediately. The ok result
|
|
// indicates that the queue isn't empty and an item was retrieved.
|
|
func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) {
|
|
// relaxed memory order would be enough here
|
|
idx := atomic.LoadUint64(&q.cidx)
|
|
cachedIdx := q.pcachedIdx
|
|
if idx == cachedIdx {
|
|
cachedIdx = atomic.LoadUint64(&q.pidx)
|
|
q.pcachedIdx = cachedIdx
|
|
if idx == cachedIdx {
|
|
return
|
|
}
|
|
}
|
|
item = q.items[idx]
|
|
q.items[idx] = nil
|
|
ok = true
|
|
nextIdx := idx + 1
|
|
if nextIdx == q.cap {
|
|
nextIdx = 0
|
|
}
|
|
atomic.StoreUint64(&q.cidx, nextIdx)
|
|
return
|
|
}
|