diff --git a/internal/transport/delivery/delivery.go b/internal/transport/delivery/delivery.go index 5ae3e6cac..1e9126b2e 100644 --- a/internal/transport/delivery/delivery.go +++ b/internal/transport/delivery/delivery.go @@ -26,7 +26,6 @@ "codeberg.org/gruf/go-structr" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/httpclient" - "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/queue" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -181,8 +180,6 @@ func (w *Worker) run(ctx context.Context) { if w.Client == nil || w.Queue == nil { panic("not yet initialized") } - log.Debugf(ctx, "%p: starting worker", w) - defer log.Debugf(ctx, "%p: stopped worker", w) util.Must(func() { w.process(ctx) }) } diff --git a/internal/workers/worker_msg.go b/internal/workers/worker_msg.go index 0b43f5e07..92180651a 100644 --- a/internal/workers/worker_msg.go +++ b/internal/workers/worker_msg.go @@ -127,8 +127,6 @@ func (w *MsgWorker[T]) run(ctx context.Context) { if w.Process == nil || w.Queue == nil { panic("not yet initialized") } - log.Debugf(ctx, "%p: starting worker", w) - defer log.Debugf(ctx, "%p: stopped worker", w) util.Must(func() { w.process(ctx) }) } diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 3f4156841..306d9e635 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -21,6 +21,7 @@ "runtime" "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/scheduler" "github.com/superseriousbusiness/gotosocial/internal/transport/delivery" @@ -59,26 +60,54 @@ type Workers struct { // StartScheduler starts the job scheduler. func (w *Workers) StartScheduler() { _ = w.Scheduler.Start() // false = already running + log.Info(nil, "started scheduler") } // Start will start contained worker pools. func (w *Workers) Start() { + var n int + maxprocs := runtime.GOMAXPROCS(0) - w.Delivery.Start(deliveryWorkers(maxprocs)) - w.Client.Start(4 * maxprocs) - w.Federator.Start(4 * maxprocs) - w.Dereference.Start(4 * maxprocs) - w.Media.Start(8 * maxprocs) + + n = deliveryWorkers(maxprocs) + w.Delivery.Start(n) + log.Infof(nil, "started %d delivery workers", n) + + n = 4 * maxprocs + w.Client.Start(n) + log.Infof(nil, "started %d client workers", n) + + n = 4 * maxprocs + w.Federator.Start(n) + log.Infof(nil, "started %d federator workers", n) + + n = 4 * maxprocs + w.Dereference.Start(n) + log.Infof(nil, "started %d dereference workers", n) + + n = 8 * maxprocs + w.Media.Start(n) + log.Infof(nil, "started %d media workers", n) } // Stop will stop all of the contained worker pools (and global scheduler). func (w *Workers) Stop() { _ = w.Scheduler.Stop() // false = not running + w.Delivery.Stop() + log.Info(nil, "stopped delivery workers") + w.Client.Stop() + log.Info(nil, "stopped client workers") + w.Federator.Stop() + log.Info(nil, "stopped federator workers") + w.Dereference.Stop() + log.Info(nil, "stopped dereference workers") + w.Media.Stop() + log.Info(nil, "stopped media workers") } // nocopy when embedded will signal linter to