From 285d55dda8b4de70661b16db4986d47e4e586ea2 Mon Sep 17 00:00:00 2001 From: Sam Lade Date: Sat, 16 Dec 2023 11:55:49 +0000 Subject: [PATCH] [feature] Push status edit messages into open streams (#2418) * push status edit messages into open streams * fix a few comments * test++ * commented out code? moi? --- internal/processing/stream/statusupdate.go | 38 ++++ .../processing/stream/statusupdate_test.go | 137 ++++++++++++++ internal/processing/stream/stream_test.go | 2 + internal/processing/workers/fromclientapi.go | 5 + internal/processing/workers/fromfediapi.go | 5 + .../processing/workers/surfacetimeline.go | 173 ++++++++++++++++++ internal/stream/stream.go | 3 + 7 files changed, 363 insertions(+) create mode 100644 internal/processing/stream/statusupdate.go create mode 100644 internal/processing/stream/statusupdate_test.go diff --git a/internal/processing/stream/statusupdate.go b/internal/processing/stream/statusupdate.go new file mode 100644 index 000000000..fd8e388ce --- /dev/null +++ b/internal/processing/stream/statusupdate.go @@ -0,0 +1,38 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package stream + +import ( + "encoding/json" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// StatusUpdate streams the given edited status to any open, appropriate +// streams belonging to the given account. +func (p *Processor) StatusUpdate(s *apimodel.Status, account *gtsmodel.Account, streamTypes []string) error { + bytes, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("error marshalling status to json: %s", err) + } + + return p.toAccount(string(bytes), stream.EventTypeStatusUpdate, streamTypes, account.ID) +} diff --git a/internal/processing/stream/statusupdate_test.go b/internal/processing/stream/statusupdate_test.go new file mode 100644 index 000000000..7b987b412 --- /dev/null +++ b/internal/processing/stream/statusupdate_test.go @@ -0,0 +1,137 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package stream_test + +import ( + "bytes" + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type StatusUpdateTestSuite struct { + StreamTestSuite +} + +func (suite *StatusUpdateTestSuite) TestStreamNotification() { + account := suite.testAccounts["local_account_1"] + + openStream, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user") + suite.NoError(errWithCode) + + editedStatus := suite.testStatuses["remote_account_1_status_1"] + apiStatus, err := typeutils.NewConverter(&suite.state).StatusToAPIStatus(context.Background(), editedStatus, account) + suite.NoError(err) + + err = suite.streamProcessor.StatusUpdate(apiStatus, account, []string{stream.TimelineHome}) + suite.NoError(err) + + msg := <-openStream.Messages + dst := new(bytes.Buffer) + err = json.Indent(dst, []byte(msg.Payload), "", " ") + suite.NoError(err) + suite.Equal(`{ + "id": "01FVW7JHQFSFK166WWKR8CBA6M", + "created_at": "2021-09-20T10:40:37.000Z", + "in_reply_to_id": null, + "in_reply_to_account_id": null, + "sensitive": false, + "spoiler_text": "", + "visibility": "unlisted", + "language": "en", + "uri": "http://fossbros-anonymous.io/users/foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M", + "url": "http://fossbros-anonymous.io/@foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M", + "replies_count": 0, + "reblogs_count": 0, + "favourites_count": 0, + "favourited": false, + "reblogged": false, + "muted": false, + "bookmarked": false, + "pinned": false, + "content": "dark souls status bot: \"thoughts of dog\"", + "reblog": null, + "account": { + "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX", + "username": "foss_satan", + "acct": "foss_satan@fossbros-anonymous.io", + "display_name": "big gerald", + "locked": false, + "discoverable": true, + "bot": false, + "created_at": "2021-09-26T10:52:36.000Z", + "note": "i post about like, i dunno, stuff, or whatever!!!!", + "url": "http://fossbros-anonymous.io/@foss_satan", + "avatar": "", + "avatar_static": "", + "header": "http://localhost:8080/assets/default_header.png", + "header_static": "http://localhost:8080/assets/default_header.png", + "followers_count": 0, + "following_count": 0, + "statuses_count": 3, + "last_status_at": "2021-09-11T09:40:37.000Z", + "emojis": [], + "fields": [] + }, + "media_attachments": [ + { + "id": "01FVW7RXPQ8YJHTEXYPE7Q8ZY0", + "type": "image", + "url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg", + "text_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg", + "preview_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/small/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg", + "remote_url": "http://fossbros-anonymous.io/attachments/original/13bbc3f8-2b5e-46ea-9531-40b4974d9912.jpg", + "preview_remote_url": "http://fossbros-anonymous.io/attachments/small/a499f55b-2d1e-4acd-98d2-1ac2ba6d79b9.jpg", + "meta": { + "original": { + "width": 472, + "height": 291, + "size": "472x291", + "aspect": 1.6219932 + }, + "small": { + "width": 472, + "height": 291, + "size": "472x291", + "aspect": 1.6219932 + }, + "focus": { + "x": 0, + "y": 0 + } + }, + "description": "tweet from thoughts of dog: i drank. all the water. in my bowl. earlier. but just now. i returned. to the same bowl. and it was. full again.. the bowl. is haunted", + "blurhash": "LARysgM_IU_3~pD%M_Rj_39FIAt6" + } + ], + "mentions": [], + "tags": [], + "emojis": [], + "card": null, + "poll": null +}`, dst.String()) + suite.Equal(msg.Event, "status.update") +} + +func TestStatusUpdateTestSuite(t *testing.T) { + suite.Run(t, &StatusUpdateTestSuite{}) +} diff --git a/internal/processing/stream/stream_test.go b/internal/processing/stream/stream_test.go index bd12674e7..2569ac701 100644 --- a/internal/processing/stream/stream_test.go +++ b/internal/processing/stream/stream_test.go @@ -30,6 +30,7 @@ type StreamTestSuite struct { suite.Suite testAccounts map[string]*gtsmodel.Account + testStatuses map[string]*gtsmodel.Status testTokens map[string]*gtsmodel.Token db db.DB oauthServer oauth.Server @@ -45,6 +46,7 @@ func (suite *StreamTestSuite) SetupTest() { testrig.InitTestConfig() suite.testAccounts = testrig.NewTestAccounts() + suite.testStatuses = testrig.NewTestStatuses() suite.testTokens = testrig.NewTestTokens() suite.db = testrig.NewTestDB(&suite.state) suite.state.DB = suite.db diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index e3f1e2d76..05b9acc1f 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -416,6 +416,11 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP } } + // Push message that the status has been edited to streams. + if err := p.surface.timelineStatusUpdate(ctx, status); err != nil { + log.Errorf(ctx, "error streaming status edit: %v", err) + } + return nil } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index d04e4ab8d..6dd4e543d 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -530,6 +530,11 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } } + // Push message that the status has been edited to streams. + if err := p.surface.timelineStatusUpdate(ctx, status); err != nil { + log.Errorf(ctx, "error streaming status edit: %v", err) + } + return nil } diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index baebdbc66..e63b8a7c0 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -390,3 +390,176 @@ func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID st Errorf("error unpreparing status from list timelines: %v", err) } } + +// timelineStatusUpdate looks up HOME and LIST timelines of accounts +// that follow the the status author and pushes edit messages into any +// active streams. +// Note that calling invalidateStatusFromTimelines takes care of the +// state in general, we just need to do this for any streams that are +// open right now. +func (s *surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Status) error { + // Ensure status fully populated; including account, mentions, etc. + if err := s.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("error populating status with id %s: %w", status.ID, err) + } + + // Get all local followers of the account that posted the status. + follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID) + if err != nil { + return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err) + } + + // If the poster is also local, add a fake entry for them + // so they can see their own status in their timeline. + if status.Account.IsLocal() { + follows = append(follows, >smodel.Follow{ + AccountID: status.AccountID, + Account: status.Account, + Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself. + ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs. + }) + } + + // Push to streams for each local follower of this account. + if err := s.timelineStatusUpdateForFollowers(ctx, status, follows); err != nil { + return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err) + } + + return nil +} + +// timelineStatusUpdateForFollowers iterates through the given +// slice of followers of the account that posted the given status, +// pushing update messages into open list/home streams of each +// follower. +func (s *surface) timelineStatusUpdateForFollowers( + ctx context.Context, + status *gtsmodel.Status, + follows []*gtsmodel.Follow, +) error { + var ( + errs gtserror.MultiError + ) + + for _, follow := range follows { + // Check to see if the status is timelineable for this follower, + // taking account of its visibility, who it replies to, and, if + // it's a reblog, whether follower account wants to see reblogs. + // + // If it's not timelineable, we can just stop early, since lists + // are prettymuch subsets of the home timeline, so if it shouldn't + // appear there, it shouldn't appear in lists either. + timelineable, err := s.filter.StatusHomeTimelineable( + ctx, follow.Account, status, + ) + if err != nil { + errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err) + continue + } + + if !timelineable { + // Nothing to do. + continue + } + + // Add status to any relevant lists + // for this follow, if applicable. + s.listTimelineStatusUpdateForFollow( + ctx, + status, + follow, + &errs, + ) + + // Add status to home timeline for owner + // of this follow, if applicable. + err = s.timelineStreamStatusUpdate( + ctx, + follow.Account, + status, + stream.TimelineHome, + ) + if err != nil { + errs.Appendf("error home timelining status: %w", err) + continue + } + } + + return errs.Combine() +} + +// listTimelineStatusUpdateForFollow pushes edits of the given status +// into any eligible lists streams opened by the given follower. +func (s *surface) listTimelineStatusUpdateForFollow( + ctx context.Context, + status *gtsmodel.Status, + follow *gtsmodel.Follow, + errs *gtserror.MultiError, +) { + // To put this status in appropriate list timelines, + // we need to get each listEntry that pertains to + // this follow. Then, we want to iterate through all + // those list entries, and add the status to the list + // that the entry belongs to if it meets criteria for + // inclusion in the list. + + // Get every list entry that targets this follow's ID. + listEntries, err := s.state.DB.GetListEntriesForFollowID( + // We only need the list IDs. + gtscontext.SetBarebones(ctx), + follow.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + errs.Appendf("error getting list entries: %w", err) + return + } + + // Check eligibility for each list entry (if any). + for _, listEntry := range listEntries { + eligible, err := s.listEligible(ctx, listEntry, status) + if err != nil { + errs.Appendf("error checking list eligibility: %w", err) + continue + } + + if !eligible { + // Don't add this. + continue + } + + // At this point we are certain this status + // should be included in the timeline of the + // list that this list entry belongs to. + if err := s.timelineStreamStatusUpdate( + ctx, + follow.Account, + status, + stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list + ); err != nil { + errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err) + // implicit continue + } + } +} + +// timelineStatusUpdate streams the edited status to the user using the +// given streamType. +func (s *surface) timelineStreamStatusUpdate( + ctx context.Context, + account *gtsmodel.Account, + status *gtsmodel.Status, + streamType string, +) error { + apiStatus, err := s.converter.StatusToAPIStatus(ctx, status, account) + if err != nil { + err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err) + return err + } + + if err := s.stream.StatusUpdate(apiStatus, account, []string{streamType}); err != nil { + err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err) + return err + } + + return nil +} diff --git a/internal/stream/stream.go b/internal/stream/stream.go index ae815e029..da5647433 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -26,6 +26,9 @@ EventTypeUpdate string = "update" // EventTypeDelete -- something should be deleted from a user EventTypeDelete string = "delete" + // EventTypeStatusUpdate -- something in the user's timeline has been edited + // (yes this is a confusing name, blame Mastodon) + EventTypeStatusUpdate string = "status.update" ) const (