add gruf worker pool

This commit is contained in:
tsmethurst 2022-01-03 17:37:09 +01:00
parent 2f57eb5ece
commit e08c0e55ee
9 changed files with 503 additions and 0 deletions

1
go.mod
View file

@ -3,6 +3,7 @@ module github.com/superseriousbusiness/gotosocial
go 1.17 go 1.17
require ( require (
codeberg.org/gruf/go-runners v1.2.0
codeberg.org/gruf/go-store v1.1.5 codeberg.org/gruf/go-store v1.1.5
github.com/ReneKroon/ttlcache v1.7.0 github.com/ReneKroon/ttlcache v1.7.0
github.com/buckket/go-blurhash v1.1.0 github.com/buckket/go-blurhash v1.1.0

2
go.sum
View file

@ -70,6 +70,8 @@ codeberg.org/gruf/go-nowish v1.1.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K
codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos= codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos=
codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4r4TAyok= codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4r4TAyok=
codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw= codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw=
codeberg.org/gruf/go-runners v1.2.0 h1:tkoPrwYMkVg1o/C4PGTR1YbC11XX4r06uLPOYajBsH4=
codeberg.org/gruf/go-runners v1.2.0/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw=
codeberg.org/gruf/go-store v1.1.5 h1:fp28vzGD15OsAF51CCwi7woH+Y3vb0aMl4OFh9JSjA0= codeberg.org/gruf/go-store v1.1.5 h1:fp28vzGD15OsAF51CCwi7woH+Y3vb0aMl4OFh9JSjA0=
codeberg.org/gruf/go-store v1.1.5/go.mod h1:Q6ev500ddKghDQ8KS4IstL/W9fptDKa2T9oeHP+tXsI= codeberg.org/gruf/go-store v1.1.5/go.mod h1:Q6ev500ddKghDQ8KS4IstL/W9fptDKa2T9oeHP+tXsI=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=

9
vendor/codeberg.org/gruf/go-runners/LICENSE generated vendored Normal file
View file

@ -0,0 +1,9 @@
MIT License
Copyright (c) 2021 gruf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

3
vendor/codeberg.org/gruf/go-runners/README.md generated vendored Normal file
View file

@ -0,0 +1,3 @@
# go-runners
Provides a means a simple means of managing long-running functions and services

36
vendor/codeberg.org/gruf/go-runners/context.go generated vendored Normal file
View file

@ -0,0 +1,36 @@
package runners
import (
"context"
"time"
)
// ContextWithCancel returns a new context.Context impl with cancel.
func ContextWithCancel() (context.Context, context.CancelFunc) {
ctx := make(cancelctx)
return ctx, func() { close(ctx) }
}
// cancelctx is the simplest possible cancellable context.
type cancelctx (chan struct{})
func (cancelctx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (ctx cancelctx) Done() <-chan struct{} {
return ctx
}
func (ctx cancelctx) Err() error {
select {
case <-ctx:
return context.Canceled
default:
return nil
}
}
func (cancelctx) Value(key interface{}) interface{} {
return nil
}

160
vendor/codeberg.org/gruf/go-runners/pool.go generated vendored Normal file
View file

@ -0,0 +1,160 @@
package runners
import (
"context"
"sync"
)
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
// that implementations absolutely MUST check whether passed context is Done()
// otherwise stopping the pool may block for large periods of time.
type WorkerFunc func(context.Context)
// WorkerPool provides a means of enqueuing asynchronous work.
type WorkerPool struct {
queue chan WorkerFunc
free chan struct{}
wait sync.WaitGroup
svc Service
}
// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size.
// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the
// queue size represents the max number of WorkerFuncs that can be queued at any one time.
func NewWorkerPool(workers int, queue int) WorkerPool {
return WorkerPool{
queue: make(chan WorkerFunc, queue),
free: make(chan struct{}, workers),
}
}
// Start will attempt to start the worker pool, asynchronously. Return is success state.
func (pool *WorkerPool) Start() bool {
ok := true
done := make(chan struct{})
go func() {
ok = pool.svc.Run(func(ctx context.Context) {
close(done)
pool.process(ctx)
})
if !ok {
close(done)
}
}()
<-done
return ok
}
// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state.
func (pool *WorkerPool) Stop() bool {
return pool.svc.Stop()
}
// Running returns whether the worker pool is running.
func (pool *WorkerPool) Running() bool {
return pool.svc.Running()
}
// execute will take a queued function and pass it to a free worker when available.
func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) {
// Set as running
pool.wait.Add(1)
select {
// Pool context cancelled
case <-ctx.Done():
pool.wait.Done()
// Free worker acquired
case pool.free <- struct{}{}:
}
go func() {
defer func() {
// defer in case panic
<-pool.free
pool.wait.Done()
}()
// Run queued
fn(ctx)
}()
}
// process is the background processing routine that passes queued functions to workers.
func (pool *WorkerPool) process(ctx context.Context) {
for {
select {
// Pool context cancelled
case <-ctx.Done():
for {
select {
// Pop and execute queued
case fn := <-pool.queue:
fn(ctx) // ctx is closed
// Empty, wait for workers
default:
pool.wait.Wait()
return
}
}
// Queued func received
case fn := <-pool.queue:
pool.execute(ctx, fn)
}
}
}
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
// Note that 'fn' will ALWAYS be executed, and the supplied context will specify whether this 'fn'
// is being executed during normal pool execution, or if the pool has been stopped with <-ctx.Done().
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// Check valid fn
if fn == nil {
return
}
select {
// Pool context cancelled
case <-pool.svc.Done():
// Placed fn in queue
case pool.queue <- fn:
}
}
// EnqueueNoBlock performs Enqueue but returns false if queue size is at max. Else, true.
func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
// Check valid fn
if fn == nil {
return false
}
select {
// Pool context cancelled
case <-pool.svc.Done():
return false
// Placed fn in queue
case pool.queue <- fn:
return true
// Queue is full
default:
return false
}
}
// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
return len(pool.queue)
}
// Workers returns the number of currently active workers.
func (pool *WorkerPool) Workers() int {
return len(pool.free)
}

130
vendor/codeberg.org/gruf/go-runners/run.go generated vendored Normal file
View file

@ -0,0 +1,130 @@
package runners
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// FuncRunner provides a means of managing long-running functions e.g. main logic loops.
type FuncRunner struct {
// HandOff is the time after which a blocking function will be considered handed off
HandOff time.Duration
// ErrorHandler is the function that errors are passed to when encountered by the
// provided function. This can be used both for logging, and for error filtering
ErrorHandler func(err error) error
svc Service // underlying service to manage start/stop
err error // last-set error
mu sync.Mutex // protects err
}
// Go will attempt to run 'fn' asynchronously. The provided context is used to propagate requested
// cancel if FuncRunner.Stop() is called. Any returned error will be passed to FuncRunner.ErrorHandler
// for filtering/logging/etc. Any blocking functions will be waited on for FuncRunner.HandOff amount of
// time before considering the function as handed off. Returned bool is success state, i.e. returns true
// if function is successfully handed off or returns within hand off time with nil error.
func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool {
done := make(chan struct{})
go func() {
var cancelled bool
has := r.svc.Run(func(ctx context.Context) {
// reset error
r.mu.Lock()
r.err = nil
r.mu.Unlock()
// Run supplied func and set errror if returned
if err := Run(func() error { return fn(ctx) }); err != nil {
r.mu.Lock()
r.err = err
r.mu.Unlock()
}
// signal done
close(done)
// Check if cancelled
select {
case <-ctx.Done():
cancelled = true
default:
cancelled = false
}
})
switch has {
// returned after starting
case true:
r.mu.Lock()
// filter out errors due FuncRunner.Stop() being called
if cancelled && errors.Is(r.err, context.Canceled) {
// filter out errors from FuncRunner.Stop() being called
r.err = nil
} else if r.err != nil && r.ErrorHandler != nil {
// pass any non-nil error to set handler
r.err = r.ErrorHandler(r.err)
}
r.mu.Unlock()
// already running
case false:
close(done)
}
}()
// get valid handoff to use
handoff := r.HandOff
if handoff < 1 {
handoff = time.Second * 5
}
select {
// handed off (long-run successful)
case <-time.After(handoff):
return true
// 'fn' returned, check error
case <-done:
return (r.Err() == nil)
}
}
// Stop will cancel the context supplied to the running function.
func (r *FuncRunner) Stop() bool {
return r.svc.Stop()
}
// Err returns the last-set error value.
func (r *FuncRunner) Err() error {
r.mu.Lock()
err := r.err
r.mu.Unlock()
return err
}
// Run will execute the supplied 'fn' catching any panics. Returns either function-returned error or formatted panic.
func Run(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(error); ok {
// wrap and preserve existing error
err = fmt.Errorf("caught panic: %w", e)
} else {
// simply create new error fromt iface
err = fmt.Errorf("caught panic: %v", r)
}
}
}()
// run supplied func
err = fn()
return
}

159
vendor/codeberg.org/gruf/go-runners/service.go generated vendored Normal file
View file

@ -0,0 +1,159 @@
package runners
import (
"context"
"sync"
)
// Service provides a means of tracking a single long-running service, provided protected state
// changes and preventing multiple instances running. Also providing service state information.
type Service struct {
state uint32 // 0=stopped, 1=running, 2=stopping
wait sync.Mutex // wait is the mutex used as a single-entity wait-group, i.e. just a "wait" :p
cncl context.CancelFunc // cncl is the cancel function set for the current context
ctx context.Context // ctx is the current context for running function (or nil if not running)
mu sync.Mutex // mu protects state changes
}
// Run will run the supplied function until completion, use given context to propagate cancel.
// Immediately returns false if the Service is already running, and true after completed run.
func (svc *Service) Run(fn func(context.Context)) bool {
// Attempt to start the svc
ctx, ok := svc.doStart()
if !ok {
return false
}
defer func() {
// unlock single wait
svc.wait.Unlock()
// ensure stopped
svc.Stop()
}()
// Run user func
if fn != nil {
fn(ctx)
}
return true
}
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
// Attempt to stop the svc
cncl, ok := svc.doStop()
if !ok {
return false
}
defer func() {
// Get svc lock
svc.mu.Lock()
// Wait until stopped
svc.wait.Lock()
svc.wait.Unlock()
// Reset the svc
svc.ctx = nil
svc.cncl = nil
svc.state = 0
svc.mu.Unlock()
}()
cncl() // cancel ctx
return true
}
// doStart will safely set Service state to started, returning a ptr to this context insance.
func (svc *Service) doStart() (context.Context, bool) {
// Protect startup
svc.mu.Lock()
if svc.state != 0 /* not stopped */ {
svc.mu.Unlock()
return nil, false
}
// state started
svc.state = 1
// Take our own ptr
var ctx context.Context
if svc.ctx == nil {
// Context required allocating
svc.ctx, svc.cncl = ContextWithCancel()
}
// Start the waiter
svc.wait.Lock()
// Set our ptr + unlock
ctx = svc.ctx
svc.mu.Unlock()
return ctx, true
}
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
func (svc *Service) doStop() (context.CancelFunc, bool) {
// Protect stop
svc.mu.Lock()
if svc.state != 1 /* not started */ {
svc.mu.Unlock()
return nil, false
}
// state stopping
svc.state = 2
// Take our own ptr
// and unlock state
cncl := svc.cncl
svc.mu.Unlock()
return cncl, true
}
// Running returns if Service is running (i.e. state NOT stopped / stopping).
func (svc *Service) Running() bool {
svc.mu.Lock()
state := svc.state
svc.mu.Unlock()
return (state == 1)
}
// Done returns a channel that's closed when Service.Stop() is called. It is
// the same channel provided to the currently running service function.
func (svc *Service) Done() <-chan struct{} {
var done <-chan struct{}
svc.mu.Lock()
switch svc.state {
// stopped
// (here we create a new context so that the
// returned 'done' channel here will still
// be valid for when Service is next started)
case 0:
if svc.ctx == nil {
// need to allocate new context
svc.ctx, svc.cncl = ContextWithCancel()
}
done = svc.ctx.Done()
// started
case 1:
done = svc.ctx.Done()
// stopping
case 2:
done = svc.ctx.Done()
}
svc.mu.Unlock()
return done
}

3
vendor/modules.txt vendored
View file

@ -22,6 +22,9 @@ codeberg.org/gruf/go-nowish
# codeberg.org/gruf/go-pools v1.0.2 # codeberg.org/gruf/go-pools v1.0.2
## explicit; go 1.16 ## explicit; go 1.16
codeberg.org/gruf/go-pools codeberg.org/gruf/go-pools
# codeberg.org/gruf/go-runners v1.2.0
## explicit; go 1.14
codeberg.org/gruf/go-runners
# codeberg.org/gruf/go-store v1.1.5 # codeberg.org/gruf/go-store v1.1.5
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-store/kv codeberg.org/gruf/go-store/kv