mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-12-27 09:36:31 +00:00
87cff71af9
* persist queued worker tasks to database on shutdown, fill worker queues from database on startup
* ensure the tasks are sorted by creation time before pushing them
* add migration to insert WorkerTask{} into database, add test for worker task persistence
* add test for recovering worker queues from database
* quick tweak
* whoops we ended up with double cleaner job scheduling
* insert each task separately, because bun is throwing some reflection error??
* add specific checking of cancelled worker contexts
* add http request signing to deliveries recovered from database
* add test for outgoing public key ID being correctly set on delivery
* replace select with Queue.PopCtx()
* get rid of loop now we don't use it
* remove field now we don't use it
* ensure that signing func is set
* header values weren't being copied over 🤦
* use ptr for httpclient.Request in delivery
* move worker queue filling to later in server init process
* fix rebase issues
* make logging less shouty
* use slices.Delete() instead of copying / reslicing
* have database return tasks in ascending order instead of sorting them
* add a 1 minute timeout to persisting worker queues
243 lines
6.2 KiB
Go
243 lines
6.2 KiB
Go
// GoToSocial
|
|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package transport
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
|
|
"github.com/superseriousbusiness/gotosocial/internal/config"
|
|
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
|
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
|
|
"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
|
|
)
|
|
|
|
func (t *transport) BatchDeliver(ctx context.Context, obj map[string]interface{}, recipients []*url.URL) error {
|
|
var (
|
|
// accumulated delivery reqs.
|
|
reqs []*delivery.Delivery
|
|
|
|
// accumulated preparation errs.
|
|
errs gtserror.MultiError
|
|
|
|
// Get current instance host info.
|
|
domain = config.GetAccountDomain()
|
|
host = config.GetHost()
|
|
)
|
|
|
|
// Marshal object as JSON.
|
|
b, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return gtserror.Newf("error marshaling json: %w", err)
|
|
}
|
|
|
|
// Extract object IDs.
|
|
actID := getActorID(obj)
|
|
objID := getObjectID(obj)
|
|
tgtID := getTargetID(obj)
|
|
|
|
for _, to := range recipients {
|
|
// Skip delivery to recipient if it is "us".
|
|
if to.Host == host || to.Host == domain {
|
|
continue
|
|
}
|
|
|
|
// Prepare http client request.
|
|
req, err := t.prepare(ctx,
|
|
actID,
|
|
objID,
|
|
tgtID,
|
|
b,
|
|
to,
|
|
)
|
|
if err != nil {
|
|
errs.Append(err)
|
|
continue
|
|
}
|
|
|
|
// Append to request queue.
|
|
reqs = append(reqs, req)
|
|
}
|
|
|
|
// Push prepared request list to the delivery queue.
|
|
t.controller.state.Workers.Delivery.Queue.Push(reqs...)
|
|
|
|
// Return combined err.
|
|
return errs.Combine()
|
|
}
|
|
|
|
func (t *transport) Deliver(ctx context.Context, obj map[string]interface{}, to *url.URL) error {
|
|
// if 'to' host is our own, skip as we don't need to deliver to ourselves...
|
|
if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
|
|
return nil
|
|
}
|
|
|
|
// Marshal object as JSON.
|
|
b, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return gtserror.Newf("error marshaling json: %w", err)
|
|
}
|
|
|
|
// Prepare http client request.
|
|
req, err := t.prepare(ctx,
|
|
getActorID(obj),
|
|
getObjectID(obj),
|
|
getTargetID(obj),
|
|
b,
|
|
to,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Push prepared request to the delivery queue.
|
|
t.controller.state.Workers.Delivery.Queue.Push(req)
|
|
|
|
return nil
|
|
}
|
|
|
|
// prepare will prepare a POST http.Request{}
|
|
// to recipient at 'to', wrapping in a queued
|
|
// request object with signing function.
|
|
func (t *transport) prepare(
|
|
ctx context.Context,
|
|
actorID string,
|
|
objectID string,
|
|
targetID string,
|
|
data []byte,
|
|
to *url.URL,
|
|
) (
|
|
*delivery.Delivery,
|
|
error,
|
|
) {
|
|
// Prepare POST signer.
|
|
sign := t.signPOST(data)
|
|
|
|
// Use *bytes.Reader for request body,
|
|
// as NewRequest() automatically will
|
|
// set .GetBody and content-length.
|
|
// (this handles necessary rewinding).
|
|
body := bytes.NewReader(data)
|
|
|
|
// Update to-be-used request context with signing details.
|
|
ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
|
|
ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
|
|
|
|
// Prepare a new request with data body directed at URL.
|
|
r, err := http.NewRequestWithContext(ctx, "POST", to.String(), body)
|
|
if err != nil {
|
|
return nil, gtserror.Newf("error preparing request: %w", err)
|
|
}
|
|
|
|
// Set our predefined controller user-agent.
|
|
r.Header.Set("User-Agent", t.controller.userAgent)
|
|
|
|
// Set the standard ActivityPub content-type + charset headers.
|
|
r.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
|
|
r.Header.Add("Accept-Charset", "utf-8")
|
|
|
|
// Validate the request before queueing for delivery.
|
|
if err := httpclient.ValidateRequest(r); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &delivery.Delivery{
|
|
ActorID: actorID,
|
|
ObjectID: objectID,
|
|
TargetID: targetID,
|
|
Request: httpclient.WrapRequest(r),
|
|
}, nil
|
|
}
|
|
|
|
func (t *transport) SignDelivery(dlv *delivery.Delivery) error {
|
|
if dlv.Request.GetBody == nil {
|
|
return gtserror.New("delivery request body not rewindable")
|
|
}
|
|
|
|
// Get a new copy of the request body.
|
|
body, err := dlv.Request.GetBody()
|
|
if err != nil {
|
|
return gtserror.Newf("error getting request body: %w", err)
|
|
}
|
|
|
|
// Read body data into memory.
|
|
data, err := io.ReadAll(body)
|
|
if err != nil {
|
|
return gtserror.Newf("error reading request body: %w", err)
|
|
}
|
|
|
|
// Get signing function for POST data.
|
|
// (note that delivery is ALWAYS POST).
|
|
sign := t.signPOST(data)
|
|
|
|
// Extract delivery context.
|
|
ctx := dlv.Request.Context()
|
|
|
|
// Update delivery request context with signing details.
|
|
ctx = gtscontext.SetOutgoingPublicKeyID(ctx, t.pubKeyID)
|
|
ctx = gtscontext.SetHTTPClientSignFunc(ctx, sign)
|
|
dlv.Request.Request = dlv.Request.Request.WithContext(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// getObjectID extracts an object ID from 'serialized' ActivityPub object map.
|
|
func getObjectID(obj map[string]interface{}) string {
|
|
switch t := obj["object"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// getActorID extracts an actor ID from 'serialized' ActivityPub object map.
|
|
func getActorID(obj map[string]interface{}) string {
|
|
switch t := obj["actor"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// getTargetID extracts a target ID from 'serialized' ActivityPub object map.
|
|
func getTargetID(obj map[string]interface{}) string {
|
|
switch t := obj["target"].(type) {
|
|
case string:
|
|
return t
|
|
case map[string]interface{}:
|
|
id, _ := t["id"].(string)
|
|
return id
|
|
default:
|
|
return ""
|
|
}
|
|
}
|