gotosocial/internal/workers/workers.go
Vyr Cossont 5b765d734e
[feature] Push notifications (#3587)
* Update push subscription API model to be Mastodon 4.0 compatible

* Add webpush-go dependency

# Conflicts:
#	go.sum

* Single-row table for storing instance's VAPID key pair

* Generate VAPID key pair during startup

* Add VAPID public key to instance info API

* Return VAPID public key when registering an app

* Store Web Push subscriptions in DB

* Add Web Push sender (similar to email sender)

* Add no-op push senders to most processor tests

* Test Web Push notifications from workers

* Delete Web Push subscriptions when account is deleted

* Implement push subscription API

* Linter fixes

* Update Swagger

* Fix enum to int migration

* Fix GetVAPIDKeyPair

* Create web push subscriptions table with indexes

* Log Web Push server error messages

* Send instance URL as Web Push JWT subject

* Accept any 2xx code as a success

* Fix malformed VAPID sub claim

* Use packed notification flags

* Remove unused date columns

* Add notification type for update notifications

Not used yet

* Make GetVAPIDKeyPair idempotent

and remove PutVAPIDKeyPair

* Post-rebase fixes

* go mod tidy

* Special-case 400 errors other than 408/429

Most client errors should remove the subscription.

* Improve titles, trim body to reasonable length

* Disallow cleartext HTTP for Web Push servers

* Fix lint

* Remove redundant index on unique column

Also removes redundant unique and notnull tags on ID column since these are implied by pk

* Make realsender.go more readable

* Use Tobi's style for wrapping errors

* Restore treating all 5xx codes as temporary problems

* Always load target account settings

* Stub `policy` and `standard`

* webpush.Sender: take type converter as ctor param

* Move webpush.MockSender and noopSender into testrig
2025-01-23 16:47:30 -08:00

145 lines
3.8 KiB
Go

// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package workers
import (
"runtime"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/scheduler"
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
)
type Workers struct {
// Main task scheduler instance.
Scheduler scheduler.Scheduler
// Delivery provides a worker pool that
// handles outgoing ActivityPub deliveries.
// It contains an embedded (but accessible)
// indexed queue of Delivery{} objects.
Delivery delivery.WorkerPool
// Client provides a worker pool that handles
// incoming processing jobs from the client API.
Client MsgWorkerPool[*messages.FromClientAPI]
// Federator provides a worker pool that handles
// incoming processing jobs from the fedi API.
Federator MsgWorkerPool[*messages.FromFediAPI]
// Dereference provides a worker pool
// for asynchronous dereferencer jobs.
Dereference FnWorkerPool
// Processing provides a worker pool
// for asynchronous processing jobs,
// eg., import tasks, admin tasks.
Processing FnWorkerPool
// WebPush provides a worker pool for
// delivering Web Push notifications.
WebPush FnWorkerPool
// prevent pass-by-value.
_ nocopy
}
// StartScheduler starts the job scheduler.
func (w *Workers) StartScheduler() {
_ = w.Scheduler.Start()
// false = already running
log.Info(nil, "started scheduler")
}
// Start will start contained worker pools.
func (w *Workers) Start() {
var n int
maxprocs := runtime.GOMAXPROCS(0)
n = deliveryWorkers(maxprocs)
w.Delivery.Start(n)
log.Infof(nil, "started %d delivery workers", n)
n = 4 * maxprocs
w.Client.Start(n)
log.Infof(nil, "started %d client workers", n)
n = 4 * maxprocs
w.Federator.Start(n)
log.Infof(nil, "started %d federator workers", n)
n = 4 * maxprocs
w.Dereference.Start(n)
log.Infof(nil, "started %d dereference workers", n)
n = maxprocs
w.Processing.Start(n)
log.Infof(nil, "started %d processing workers", n)
n = maxprocs
w.WebPush.Start(n)
log.Infof(nil, "started %d Web Push workers", n)
}
// Stop will stop all of the contained
// worker pools (and global scheduler).
func (w *Workers) Stop() {
_ = w.Scheduler.Stop()
// false = not running
log.Info(nil, "stopped scheduler")
w.Delivery.Stop()
log.Info(nil, "stopped delivery workers")
w.Client.Stop()
log.Info(nil, "stopped client workers")
w.Federator.Stop()
log.Info(nil, "stopped federator workers")
w.Dereference.Stop()
log.Info(nil, "stopped dereference workers")
w.Processing.Stop()
log.Info(nil, "stopped processing workers")
w.WebPush.Stop()
log.Info(nil, "stopped WebPush workers")
}
// nocopy when embedded will signal linter to
// error on pass-by-value of parent struct.
type nocopy struct{}
func (*nocopy) Lock() {}
func (*nocopy) Unlock() {}
func deliveryWorkers(maxprocs int) int {
n := config.GetAdvancedSenderMultiplier()
if n < 1 {
// clamp to 1
return 1
}
return n * maxprocs
}