[feature] Conversations API (#3013)

* Implement conversations API

* Sort and page conversations by last status ID

* Appease linter

* Fix deleting conversations and statuses

* Refactor to make migrations automatic

* Lint

* Update tests post-merge

* Fixes from live-fire testing

* Linter caught a format problem

* Refactor tests, fix cache

* Negative test for non-DMs

* Run conversations advanced migration on testrig startup as well as regular server startup

* Document (lack of) side effects of API method for deleting a conversation

* Make not-found check less nested for readability

* Rename PutConversation to UpsertConversation

* Use util.Ptr instead of IIFE

* Reduce cache used by conversations

* Remove unnecessary TableExpr/ColumnExpr

* Use struct tags for both unique constraints on Conversation

* Make it clear how paging with GetDirectStatusIDsBatch should be used

* Let conversation paging skip conversations it can't render

* Use Bun NewDropTable

* Convert delete raw query to Bun

* Convert update raw query to Bun

* Convert latestConversationStatusesTempTable raw query partially to Bun

* Convert conversationStatusesTempTable raw query partially to Bun

* Rename field used to store result of MaxDirectStatusID

* Move advanced migrations to their own tiny processor

* Catch up util function name with main

* Remove json.… wrappers

* Remove redundant check

* Combine error checks

* Replace map with slice of structs

* Address processor/type converter comments

- Add context info for errors
- Extract some common processor code into shared methods
- Move conversation eligibility check ahead of populating conversation

* Add error context when dropping temp tables
This commit is contained in:
Vyr Cossont 2024-07-23 12:44:31 -07:00 committed by GitHub
parent 31294f7c78
commit 8fdd358f4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
55 changed files with 3317 additions and 143 deletions

View file

@ -290,6 +290,11 @@ func(context.Context, time.Time) {
return fmt.Errorf("error initializing metrics: %w", err) return fmt.Errorf("error initializing metrics: %w", err)
} }
// Run advanced migrations.
if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
return err
}
/* /*
HTTP router initialization HTTP router initialization
*/ */

View file

@ -204,6 +204,11 @@
return fmt.Errorf("error initializing metrics: %w", err) return fmt.Errorf("error initializing metrics: %w", err)
} }
// Run advanced migrations.
if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
return err
}
/* /*
HTTP router initialization HTTP router initialization
*/ */

View file

@ -6208,11 +6208,43 @@ paths:
- read:bookmarks - read:bookmarks
tags: tags:
- bookmarks - bookmarks
/api/v1/conversation/{id}/read:
post:
operationId: conversationRead
parameters:
- description: ID of the conversation.
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: Updated conversation.
schema:
$ref: '#/definitions/conversation'
"400":
description: bad request
"401":
description: unauthorized
"404":
description: not found
"406":
description: not acceptable
"422":
description: unprocessable content
"500":
description: internal server error
security:
- OAuth2 Bearer:
- write:conversations
summary: Mark a conversation with the given ID as read.
tags:
- conversations
/api/v1/conversations: /api/v1/conversations:
get: get:
description: |- description: |-
NOT IMPLEMENTED YET: Will currently always return an array of length 0.
The next and previous queries can be parsed from the returned Link header. The next and previous queries can be parsed from the returned Link header.
Example: Example:
@ -6221,15 +6253,15 @@ paths:
```` ````
operationId: conversationsGet operationId: conversationsGet
parameters: parameters:
- description: 'Return only conversations *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' - description: 'Return only conversations with last statuses *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query in: query
name: max_id name: max_id
type: string type: string
- description: 'Return only conversations *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' - description: 'Return only conversations with last statuses *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query in: query
name: since_id name: since_id
type: string type: string
- description: 'Return only conversations *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.' - description: 'Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query in: query
name: min_id name: min_id
type: string type: string
@ -6269,6 +6301,39 @@ paths:
summary: Get an array of (direct message) conversations that requesting account is involved in. summary: Get an array of (direct message) conversations that requesting account is involved in.
tags: tags:
- conversations - conversations
/api/v1/conversations/{id}:
delete:
description: |-
This doesn't delete the actual statuses in the conversation,
nor does it prevent a new conversation from being created later from the same thread and participants.
operationId: conversationDelete
parameters:
- description: ID of the conversation
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: conversation deleted
"400":
description: bad request
"401":
description: unauthorized
"404":
description: not found
"406":
description: not acceptable
"500":
description: internal server error
security:
- OAuth2 Bearer:
- write:conversations
summary: Delete a single conversation with the given ID.
tags:
- conversations
/api/v1/custom_emojis: /api/v1/custom_emojis:
get: get:
operationId: customEmojisGet operationId: customEmojisGet

View file

@ -0,0 +1,93 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"net/http"
"github.com/gin-gonic/gin"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
)
// ConversationDELETEHandler swagger:operation DELETE /api/v1/conversations/{id} conversationDelete
//
// Delete a single conversation with the given ID.
//
// This doesn't delete the actual statuses in the conversation,
// nor does it prevent a new conversation from being created later from the same thread and participants.
//
// ---
// tags:
// - conversations
//
// produces:
// - application/json
//
// parameters:
// -
// name: id
// type: string
// description: ID of the conversation
// in: path
// required: true
//
// security:
// - OAuth2 Bearer:
// - write:conversations
//
// responses:
// '200':
// description: conversation deleted
// '400':
// description: bad request
// '401':
// description: unauthorized
// '404':
// description: not found
// '406':
// description: not acceptable
// '500':
// description: internal server error
func (m *Module) ConversationDELETEHandler(c *gin.Context) {
authed, err := oauth.Authed(c, true, true, true, true)
if err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
return
}
if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1)
return
}
id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey))
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
errWithCode = m.processor.Conversations().Delete(c.Request.Context(), authed.Account, id)
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
c.JSON(http.StatusOK, apiutil.EmptyJSONObject)
}

View file

@ -0,0 +1,95 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"net/http"
"github.com/gin-gonic/gin"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
)
// ConversationReadPOSTHandler swagger:operation POST /api/v1/conversation/{id}/read conversationRead
//
// Mark a conversation with the given ID as read.
//
// ---
// tags:
// - conversations
//
// produces:
// - application/json
//
// parameters:
// -
// name: id
// in: path
// type: string
// required: true
// description: ID of the conversation.
//
// security:
// - OAuth2 Bearer:
// - write:conversations
//
// responses:
// '200':
// name: conversation
// description: Updated conversation.
// schema:
// "$ref": "#/definitions/conversation"
// '400':
// description: bad request
// '401':
// description: unauthorized
// '404':
// description: not found
// '406':
// description: not acceptable
// '422':
// description: unprocessable content
// '500':
// description: internal server error
func (m *Module) ConversationReadPOSTHandler(c *gin.Context) {
authed, err := oauth.Authed(c, true, true, true, true)
if err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
return
}
if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1)
return
}
id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey))
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
apiConversation, errWithCode := m.processor.Conversations().Read(c.Request.Context(), authed.Account, id)
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
apiutil.JSON(c, http.StatusOK, apiConversation)
}

View file

@ -21,13 +21,17 @@
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/processing" "github.com/superseriousbusiness/gotosocial/internal/processing"
) )
const ( const (
// BasePath is the base URI path for serving // BasePath is the base path for serving the conversations API, minus the 'api' prefix.
// conversations, minus the api prefix.
BasePath = "/v1/conversations" BasePath = "/v1/conversations"
// BasePathWithID is the base path with the ID key in it, for operations on an existing conversation.
BasePathWithID = BasePath + "/:" + apiutil.IDKey
// ReadPathWithID is the path for marking an existing conversation as read.
ReadPathWithID = BasePathWithID + "/read"
) )
type Module struct { type Module struct {
@ -42,4 +46,6 @@ func New(processor *processing.Processor) *Module {
func (m *Module) Route(attachHandler func(method string, path string, f ...gin.HandlerFunc) gin.IRoutes) { func (m *Module) Route(attachHandler func(method string, path string, f ...gin.HandlerFunc) gin.IRoutes) {
attachHandler(http.MethodGet, BasePath, m.ConversationsGETHandler) attachHandler(http.MethodGet, BasePath, m.ConversationsGETHandler)
attachHandler(http.MethodDelete, BasePathWithID, m.ConversationDELETEHandler)
attachHandler(http.MethodPost, ReadPathWithID, m.ConversationReadPOSTHandler)
} }

View file

@ -24,14 +24,13 @@
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/paging"
) )
// ConversationsGETHandler swagger:operation GET /api/v1/conversations conversationsGet // ConversationsGETHandler swagger:operation GET /api/v1/conversations conversationsGet
// //
// Get an array of (direct message) conversations that requesting account is involved in. // Get an array of (direct message) conversations that requesting account is involved in.
// //
// NOT IMPLEMENTED YET: Will currently always return an array of length 0.
//
// The next and previous queries can be parsed from the returned Link header. // The next and previous queries can be parsed from the returned Link header.
// Example: // Example:
// //
@ -51,26 +50,26 @@
// name: max_id // name: max_id
// type: string // type: string
// description: >- // description: >-
// Return only conversations *OLDER* than the given max ID. // Return only conversations with last statuses *OLDER* than the given max ID.
// The conversation with the specified ID will not be included in the response. // The conversation with the specified ID will not be included in the response.
// NOTE: the ID is of the internal conversation, use the Link header for pagination. // NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query // in: query
// required: false // required: false
// - // -
// name: since_id // name: since_id
// type: string // type: string
// description: >- // description: >-
// Return only conversations *NEWER* than the given since ID. // Return only conversations with last statuses *NEWER* than the given since ID.
// The conversation with the specified ID will not be included in the response. // The conversation with the specified ID will not be included in the response.
// NOTE: the ID is of the internal conversation, use the Link header for pagination. // NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query // in: query
// - // -
// name: min_id // name: min_id
// type: string // type: string
// description: >- // description: >-
// Return only conversations *IMMEDIATELY NEWER* than the given min ID. // Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID.
// The conversation with the specified ID will not be included in the response. // The conversation with the specified ID will not be included in the response.
// NOTE: the ID is of the internal conversation, use the Link header for pagination. // NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query // in: query
// required: false // required: false
// - // -
@ -108,7 +107,8 @@
// '500': // '500':
// description: internal server error // description: internal server error
func (m *Module) ConversationsGETHandler(c *gin.Context) { func (m *Module) ConversationsGETHandler(c *gin.Context) {
if _, err := oauth.Authed(c, true, true, true, true); err != nil { authed, err := oauth.Authed(c, true, true, true, true)
if err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1) apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
return return
} }
@ -118,5 +118,29 @@ func (m *Module) ConversationsGETHandler(c *gin.Context) {
return return
} }
apiutil.Data(c, http.StatusOK, apiutil.AppJSON, apiutil.EmptyJSONArray) page, errWithCode := paging.ParseIDPage(c,
1, // min limit
80, // max limit
40, // default limit
)
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
resp, errWithCode := m.processor.Conversations().GetAll(
c.Request.Context(),
authed.Account,
page,
)
if errWithCode != nil {
apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
return
}
if resp.LinkHeader != "" {
c.Header("Link", resp.LinkHeader)
}
apiutil.JSON(c, http.StatusOK, resp.Items)
} }

View file

@ -60,6 +60,8 @@ func (c *Caches) Init() {
c.initBlockIDs() c.initBlockIDs()
c.initBoostOfIDs() c.initBoostOfIDs()
c.initClient() c.initClient()
c.initConversation()
c.initConversationLastStatusIDs()
c.initDomainAllow() c.initDomainAllow()
c.initDomainBlock() c.initDomainBlock()
c.initEmoji() c.initEmoji()

52
internal/cache/db.go vendored
View file

@ -56,6 +56,12 @@ type GTSCaches struct {
// Client provides access to the gtsmodel Client database cache. // Client provides access to the gtsmodel Client database cache.
Client StructCache[*gtsmodel.Client] Client StructCache[*gtsmodel.Client]
// Conversation provides access to the gtsmodel Conversation database cache.
Conversation StructCache[*gtsmodel.Conversation]
// ConversationLastStatusIDs provides access to the conversation last status IDs database cache.
ConversationLastStatusIDs SliceCache[string]
// DomainAllow provides access to the domain allow database cache. // DomainAllow provides access to the domain allow database cache.
DomainAllow *domain.Cache DomainAllow *domain.Cache
@ -426,6 +432,52 @@ func (c *Caches) initClient() {
}) })
} }
func (c *Caches) initConversation() {
cap := calculateResultCacheMax(
sizeofConversation(), // model in-mem size.
config.GetCacheConversationMemRatio(),
)
log.Infof(nil, "cache size = %d", cap)
copyF := func(c1 *gtsmodel.Conversation) *gtsmodel.Conversation {
c2 := new(gtsmodel.Conversation)
*c2 = *c1
// Don't include ptr fields that
// will be populated separately.
// See internal/db/bundb/conversation.go.
c2.Account = nil
c2.OtherAccounts = nil
c2.LastStatus = nil
return c2
}
c.GTS.Conversation.Init(structr.CacheConfig[*gtsmodel.Conversation]{
Indices: []structr.IndexConfig{
{Fields: "ID"},
{Fields: "ThreadID,AccountID,OtherAccountsKey"},
{Fields: "AccountID,LastStatusID"},
{Fields: "AccountID", Multiple: true},
},
MaxSize: cap,
IgnoreErr: ignoreErrors,
Copy: copyF,
Invalidate: c.OnInvalidateConversation,
})
}
func (c *Caches) initConversationLastStatusIDs() {
cap := calculateSliceCacheMax(
config.GetCacheConversationLastStatusIDsMemRatio(),
)
log.Infof(nil, "cache size = %d", cap)
c.GTS.ConversationLastStatusIDs.Init(0, cap)
}
func (c *Caches) initDomainAllow() { func (c *Caches) initDomainAllow() {
c.GTS.DomainAllow = new(domain.Cache) c.GTS.DomainAllow = new(domain.Cache)
} }

View file

@ -83,6 +83,11 @@ func (c *Caches) OnInvalidateClient(client *gtsmodel.Client) {
c.GTS.Token.Invalidate("ClientID", client.ID) c.GTS.Token.Invalidate("ClientID", client.ID)
} }
func (c *Caches) OnInvalidateConversation(conversation *gtsmodel.Conversation) {
// Invalidate owning account's conversation list.
c.GTS.ConversationLastStatusIDs.Invalidate(conversation.AccountID)
}
func (c *Caches) OnInvalidateEmojiCategory(category *gtsmodel.EmojiCategory) { func (c *Caches) OnInvalidateEmojiCategory(category *gtsmodel.EmojiCategory) {
// Invalidate any emoji in this category. // Invalidate any emoji in this category.
c.GTS.Emoji.Invalidate("CategoryID", category.ID) c.GTS.Emoji.Invalidate("CategoryID", category.ID)

View file

@ -19,6 +19,7 @@
import ( import (
"crypto/rsa" "crypto/rsa"
"strings"
"time" "time"
"unsafe" "unsafe"
@ -320,6 +321,20 @@ func sizeofClient() uintptr {
})) }))
} }
func sizeofConversation() uintptr {
return uintptr(size.Of(&gtsmodel.Conversation{
ID: exampleID,
CreatedAt: exampleTime,
UpdatedAt: exampleTime,
AccountID: exampleID,
OtherAccountIDs: []string{exampleID, exampleID, exampleID},
OtherAccountsKey: strings.Join([]string{exampleID, exampleID, exampleID}, ","),
ThreadID: exampleID,
LastStatusID: exampleID,
Read: util.Ptr(true),
}))
}
func sizeofEmoji() uintptr { func sizeofEmoji() uintptr {
return uintptr(size.Of(&gtsmodel.Emoji{ return uintptr(size.Of(&gtsmodel.Emoji{
ID: exampleID, ID: exampleID,

View file

@ -158,6 +158,34 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string)
}) })
} }
// LoadIDs2Part works as LoadIDs, except using a two-part key,
// where the first part is an ID shared by all the objects,
// and the second part is a list of per-object IDs.
func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, load func(string, []string) ([]T, error)) ([]T, error) {
i := c.index[index]
if i == nil {
// we only perform this check here as
// we're going to use the index before
// passing it to cache in main .Load().
panic("missing index for cache type")
}
// Generate cache keys for two-part IDs.
keys := make([]structr.Key, len(id2s))
for x, id2 := range id2s {
keys[x] = i.Key(id1, id2)
}
// Pass loader callback with wrapper onto main cache load function.
return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
uncachedIDs := make([]string, len(uncached))
for i := range uncached {
uncachedIDs[i] = uncached[i].Values()[1].(string)
}
return load(id1, uncachedIDs)
})
}
// Store: see structr.Cache{}.Store(). // Store: see structr.Cache{}.Store().
func (c *StructCache[T]) Store(value T, store func() error) error { func (c *StructCache[T]) Store(value T, store func() error) error {
return c.cache.Store(value, store) return c.cache.Store(value, store)

View file

@ -191,53 +191,55 @@ type HTTPClientConfiguration struct {
} }
type CacheConfiguration struct { type CacheConfiguration struct {
MemoryTarget bytesize.Size `name:"memory-target"` MemoryTarget bytesize.Size `name:"memory-target"`
AccountMemRatio float64 `name:"account-mem-ratio"` AccountMemRatio float64 `name:"account-mem-ratio"`
AccountNoteMemRatio float64 `name:"account-note-mem-ratio"` AccountNoteMemRatio float64 `name:"account-note-mem-ratio"`
AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"` AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"`
AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"` AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"`
ApplicationMemRatio float64 `name:"application-mem-ratio"` ApplicationMemRatio float64 `name:"application-mem-ratio"`
BlockMemRatio float64 `name:"block-mem-ratio"` BlockMemRatio float64 `name:"block-mem-ratio"`
BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"` BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"`
BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"` BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"`
ClientMemRatio float64 `name:"client-mem-ratio"` ClientMemRatio float64 `name:"client-mem-ratio"`
EmojiMemRatio float64 `name:"emoji-mem-ratio"` ConversationMemRatio float64 `name:"conversation-mem-ratio"`
EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"` ConversationLastStatusIDsMemRatio float64 `name:"conversation-last-status-ids-mem-ratio"`
FilterMemRatio float64 `name:"filter-mem-ratio"` EmojiMemRatio float64 `name:"emoji-mem-ratio"`
FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"` EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"`
FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"` FilterMemRatio float64 `name:"filter-mem-ratio"`
FollowMemRatio float64 `name:"follow-mem-ratio"` FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"`
FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"` FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"`
FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"` FollowMemRatio float64 `name:"follow-mem-ratio"`
FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"` FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"`
InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"` FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"`
InstanceMemRatio float64 `name:"instance-mem-ratio"` FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"`
InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"` InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"`
ListMemRatio float64 `name:"list-mem-ratio"` InstanceMemRatio float64 `name:"instance-mem-ratio"`
ListEntryMemRatio float64 `name:"list-entry-mem-ratio"` InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"`
MarkerMemRatio float64 `name:"marker-mem-ratio"` ListMemRatio float64 `name:"list-mem-ratio"`
MediaMemRatio float64 `name:"media-mem-ratio"` ListEntryMemRatio float64 `name:"list-entry-mem-ratio"`
MentionMemRatio float64 `name:"mention-mem-ratio"` MarkerMemRatio float64 `name:"marker-mem-ratio"`
MoveMemRatio float64 `name:"move-mem-ratio"` MediaMemRatio float64 `name:"media-mem-ratio"`
NotificationMemRatio float64 `name:"notification-mem-ratio"` MentionMemRatio float64 `name:"mention-mem-ratio"`
PollMemRatio float64 `name:"poll-mem-ratio"` MoveMemRatio float64 `name:"move-mem-ratio"`
PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"` NotificationMemRatio float64 `name:"notification-mem-ratio"`
PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"` PollMemRatio float64 `name:"poll-mem-ratio"`
ReportMemRatio float64 `name:"report-mem-ratio"` PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"`
StatusMemRatio float64 `name:"status-mem-ratio"` PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"`
StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"` ReportMemRatio float64 `name:"report-mem-ratio"`
StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"` StatusMemRatio float64 `name:"status-mem-ratio"`
StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"` StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"`
StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"` StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"`
TagMemRatio float64 `name:"tag-mem-ratio"` StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"`
ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"` StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"`
TokenMemRatio float64 `name:"token-mem-ratio"` TagMemRatio float64 `name:"tag-mem-ratio"`
TombstoneMemRatio float64 `name:"tombstone-mem-ratio"` ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"`
UserMemRatio float64 `name:"user-mem-ratio"` TokenMemRatio float64 `name:"token-mem-ratio"`
UserMuteMemRatio float64 `name:"user-mute-mem-ratio"` TombstoneMemRatio float64 `name:"tombstone-mem-ratio"`
UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"` UserMemRatio float64 `name:"user-mem-ratio"`
WebfingerMemRatio float64 `name:"webfinger-mem-ratio"` UserMuteMemRatio float64 `name:"user-mute-mem-ratio"`
VisibilityMemRatio float64 `name:"visibility-mem-ratio"` UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"`
WebfingerMemRatio float64 `name:"webfinger-mem-ratio"`
VisibilityMemRatio float64 `name:"visibility-mem-ratio"`
} }
// MarshalMap will marshal current Configuration into a map structure (useful for JSON/TOML/YAML). // MarshalMap will marshal current Configuration into a map structure (useful for JSON/TOML/YAML).

View file

@ -156,52 +156,54 @@
// when TODO items in the size.go source // when TODO items in the size.go source
// file have been addressed, these should // file have been addressed, these should
// be able to make some more sense :D // be able to make some more sense :D
AccountMemRatio: 5, AccountMemRatio: 5,
AccountNoteMemRatio: 1, AccountNoteMemRatio: 1,
AccountSettingsMemRatio: 0.1, AccountSettingsMemRatio: 0.1,
AccountStatsMemRatio: 2, AccountStatsMemRatio: 2,
ApplicationMemRatio: 0.1, ApplicationMemRatio: 0.1,
BlockMemRatio: 2, BlockMemRatio: 2,
BlockIDsMemRatio: 3, BlockIDsMemRatio: 3,
BoostOfIDsMemRatio: 3, BoostOfIDsMemRatio: 3,
ClientMemRatio: 0.1, ClientMemRatio: 0.1,
EmojiMemRatio: 3, ConversationMemRatio: 1,
EmojiCategoryMemRatio: 0.1, ConversationLastStatusIDsMemRatio: 2,
FilterMemRatio: 0.5, EmojiMemRatio: 3,
FilterKeywordMemRatio: 0.5, EmojiCategoryMemRatio: 0.1,
FilterStatusMemRatio: 0.5, FilterMemRatio: 0.5,
FollowMemRatio: 2, FilterKeywordMemRatio: 0.5,
FollowIDsMemRatio: 4, FilterStatusMemRatio: 0.5,
FollowRequestMemRatio: 2, FollowMemRatio: 2,
FollowRequestIDsMemRatio: 2, FollowIDsMemRatio: 4,
InReplyToIDsMemRatio: 3, FollowRequestMemRatio: 2,
InstanceMemRatio: 1, FollowRequestIDsMemRatio: 2,
InteractionApprovalMemRatio: 1, InReplyToIDsMemRatio: 3,
ListMemRatio: 1, InstanceMemRatio: 1,
ListEntryMemRatio: 2, InteractionApprovalMemRatio: 1,
MarkerMemRatio: 0.5, ListMemRatio: 1,
MediaMemRatio: 4, ListEntryMemRatio: 2,
MentionMemRatio: 2, MarkerMemRatio: 0.5,
MoveMemRatio: 0.1, MediaMemRatio: 4,
NotificationMemRatio: 2, MentionMemRatio: 2,
PollMemRatio: 1, MoveMemRatio: 0.1,
PollVoteMemRatio: 2, NotificationMemRatio: 2,
PollVoteIDsMemRatio: 2, PollMemRatio: 1,
ReportMemRatio: 1, PollVoteMemRatio: 2,
StatusMemRatio: 5, PollVoteIDsMemRatio: 2,
StatusBookmarkMemRatio: 0.5, ReportMemRatio: 1,
StatusBookmarkIDsMemRatio: 2, StatusMemRatio: 5,
StatusFaveMemRatio: 2, StatusBookmarkMemRatio: 0.5,
StatusFaveIDsMemRatio: 3, StatusBookmarkIDsMemRatio: 2,
TagMemRatio: 2, StatusFaveMemRatio: 2,
ThreadMuteMemRatio: 0.2, StatusFaveIDsMemRatio: 3,
TokenMemRatio: 0.75, TagMemRatio: 2,
TombstoneMemRatio: 0.5, ThreadMuteMemRatio: 0.2,
UserMemRatio: 0.25, TokenMemRatio: 0.75,
UserMuteMemRatio: 2, TombstoneMemRatio: 0.5,
UserMuteIDsMemRatio: 3, UserMemRatio: 0.25,
WebfingerMemRatio: 0.1, UserMuteMemRatio: 2,
VisibilityMemRatio: 2, UserMuteIDsMemRatio: 3,
WebfingerMemRatio: 0.1,
VisibilityMemRatio: 2,
}, },
HTTPClient: HTTPClientConfiguration{ HTTPClient: HTTPClientConfiguration{

View file

@ -2975,6 +2975,62 @@ func GetCacheClientMemRatio() float64 { return global.GetCacheClientMemRatio() }
// SetCacheClientMemRatio safely sets the value for global configuration 'Cache.ClientMemRatio' field // SetCacheClientMemRatio safely sets the value for global configuration 'Cache.ClientMemRatio' field
func SetCacheClientMemRatio(v float64) { global.SetCacheClientMemRatio(v) } func SetCacheClientMemRatio(v float64) { global.SetCacheClientMemRatio(v) }
// GetCacheConversationMemRatio safely fetches the Configuration value for state's 'Cache.ConversationMemRatio' field
func (st *ConfigState) GetCacheConversationMemRatio() (v float64) {
st.mutex.RLock()
v = st.config.Cache.ConversationMemRatio
st.mutex.RUnlock()
return
}
// SetCacheConversationMemRatio safely sets the Configuration value for state's 'Cache.ConversationMemRatio' field
func (st *ConfigState) SetCacheConversationMemRatio(v float64) {
st.mutex.Lock()
defer st.mutex.Unlock()
st.config.Cache.ConversationMemRatio = v
st.reloadToViper()
}
// CacheConversationMemRatioFlag returns the flag name for the 'Cache.ConversationMemRatio' field
func CacheConversationMemRatioFlag() string { return "cache-conversation-mem-ratio" }
// GetCacheConversationMemRatio safely fetches the value for global configuration 'Cache.ConversationMemRatio' field
func GetCacheConversationMemRatio() float64 { return global.GetCacheConversationMemRatio() }
// SetCacheConversationMemRatio safely sets the value for global configuration 'Cache.ConversationMemRatio' field
func SetCacheConversationMemRatio(v float64) { global.SetCacheConversationMemRatio(v) }
// GetCacheConversationLastStatusIDsMemRatio safely fetches the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field
func (st *ConfigState) GetCacheConversationLastStatusIDsMemRatio() (v float64) {
st.mutex.RLock()
v = st.config.Cache.ConversationLastStatusIDsMemRatio
st.mutex.RUnlock()
return
}
// SetCacheConversationLastStatusIDsMemRatio safely sets the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field
func (st *ConfigState) SetCacheConversationLastStatusIDsMemRatio(v float64) {
st.mutex.Lock()
defer st.mutex.Unlock()
st.config.Cache.ConversationLastStatusIDsMemRatio = v
st.reloadToViper()
}
// CacheConversationLastStatusIDsMemRatioFlag returns the flag name for the 'Cache.ConversationLastStatusIDsMemRatio' field
func CacheConversationLastStatusIDsMemRatioFlag() string {
return "cache-conversation-last-status-ids-mem-ratio"
}
// GetCacheConversationLastStatusIDsMemRatio safely fetches the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field
func GetCacheConversationLastStatusIDsMemRatio() float64 {
return global.GetCacheConversationLastStatusIDsMemRatio()
}
// SetCacheConversationLastStatusIDsMemRatio safely sets the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field
func SetCacheConversationLastStatusIDsMemRatio(v float64) {
global.SetCacheConversationLastStatusIDsMemRatio(v)
}
// GetCacheEmojiMemRatio safely fetches the Configuration value for state's 'Cache.EmojiMemRatio' field // GetCacheEmojiMemRatio safely fetches the Configuration value for state's 'Cache.EmojiMemRatio' field
func (st *ConfigState) GetCacheEmojiMemRatio() (v float64) { func (st *ConfigState) GetCacheEmojiMemRatio() (v float64) {
st.mutex.RLock() st.mutex.RLock()

View file

@ -0,0 +1,29 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package db
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
type AdvancedMigration interface {
GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error)
PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error
}

View file

@ -0,0 +1,52 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package bundb
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/uptrace/bun"
)
type advancedMigrationDB struct {
db *bun.DB
state *state.State
}
func (a *advancedMigrationDB) GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error) {
var advancedMigration gtsmodel.AdvancedMigration
err := a.db.NewSelect().
Model(&advancedMigration).
Where("? = ?", bun.Ident("id"), id).
Limit(1).
Scan(ctx)
if err != nil {
return nil, err
}
return &advancedMigration, nil
}
func (a *advancedMigrationDB) PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error {
_, err := NewUpsert(a.db).
Model(advancedMigration).
Constraint("id").
Exec(ctx)
return err
}

View file

@ -54,8 +54,10 @@
type DBService struct { type DBService struct {
db.Account db.Account
db.Admin db.Admin
db.AdvancedMigration
db.Application db.Application
db.Basic db.Basic
db.Conversation
db.Domain db.Domain
db.Emoji db.Emoji
db.HeaderFilter db.HeaderFilter
@ -158,6 +160,7 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
// https://bun.uptrace.dev/orm/many-to-many-relation/ // https://bun.uptrace.dev/orm/many-to-many-relation/
for _, t := range []interface{}{ for _, t := range []interface{}{
&gtsmodel.AccountToEmoji{}, &gtsmodel.AccountToEmoji{},
&gtsmodel.ConversationToStatus{},
&gtsmodel.StatusToEmoji{}, &gtsmodel.StatusToEmoji{},
&gtsmodel.StatusToTag{}, &gtsmodel.StatusToTag{},
&gtsmodel.ThreadToStatus{}, &gtsmodel.ThreadToStatus{},
@ -181,6 +184,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
db: db, db: db,
state: state, state: state,
}, },
AdvancedMigration: &advancedMigrationDB{
db: db,
state: state,
},
Application: &applicationDB{ Application: &applicationDB{
db: db, db: db,
state: state, state: state,
@ -188,6 +195,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
Basic: &basicDB{ Basic: &basicDB{
db: db, db: db,
}, },
Conversation: &conversationDB{
db: db,
state: state,
},
Domain: &domainDB{ Domain: &domainDB{
db: db, db: db,
state: state, state: state,

View file

@ -0,0 +1,494 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package bundb
import (
"context"
"errors"
"slices"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/paging"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/util"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect"
)
type conversationDB struct {
db *bun.DB
state *state.State
}
func (c *conversationDB) GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error) {
return c.getConversation(
ctx,
"ID",
func(conversation *gtsmodel.Conversation) error {
return c.db.
NewSelect().
Model(conversation).
Where("? = ?", bun.Ident("id"), id).
Scan(ctx)
},
id,
)
}
func (c *conversationDB) GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error) {
otherAccountsKey := gtsmodel.ConversationOtherAccountsKey(otherAccountIDs)
return c.getConversation(
ctx,
"ThreadID,AccountID,OtherAccountsKey",
func(conversation *gtsmodel.Conversation) error {
return c.db.
NewSelect().
Model(conversation).
Where("? = ?", bun.Ident("thread_id"), threadID).
Where("? = ?", bun.Ident("account_id"), accountID).
Where("? = ?", bun.Ident("other_accounts_key"), otherAccountsKey).
Scan(ctx)
},
threadID,
accountID,
otherAccountsKey,
)
}
func (c *conversationDB) getConversation(
ctx context.Context,
lookup string,
dbQuery func(conversation *gtsmodel.Conversation) error,
keyParts ...any,
) (*gtsmodel.Conversation, error) {
// Fetch conversation from cache with loader callback
conversation, err := c.state.Caches.GTS.Conversation.LoadOne(lookup, func() (*gtsmodel.Conversation, error) {
var conversation gtsmodel.Conversation
// Not cached! Perform database query
if err := dbQuery(&conversation); err != nil {
return nil, err
}
return &conversation, nil
}, keyParts...)
if err != nil {
// already processe
return nil, err
}
if gtscontext.Barebones(ctx) {
// Only a barebones model was requested.
return conversation, nil
}
if err := c.populateConversation(ctx, conversation); err != nil {
return nil, err
}
return conversation, nil
}
func (c *conversationDB) populateConversation(ctx context.Context, conversation *gtsmodel.Conversation) error {
var (
errs gtserror.MultiError
err error
)
if conversation.Account == nil {
conversation.Account, err = c.state.DB.GetAccountByID(
gtscontext.SetBarebones(ctx),
conversation.AccountID,
)
if err != nil {
errs.Appendf("error populating conversation owner account: %w", err)
}
}
if conversation.OtherAccounts == nil {
conversation.OtherAccounts, err = c.state.DB.GetAccountsByIDs(
gtscontext.SetBarebones(ctx),
conversation.OtherAccountIDs,
)
if err != nil {
errs.Appendf("error populating other conversation accounts: %w", err)
}
}
if conversation.LastStatus == nil && conversation.LastStatusID != "" {
conversation.LastStatus, err = c.state.DB.GetStatusByID(
gtscontext.SetBarebones(ctx),
conversation.LastStatusID,
)
if err != nil {
errs.Appendf("error populating conversation last status: %w", err)
}
}
return errs.Combine()
}
func (c *conversationDB) GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error) {
conversationLastStatusIDs, err := c.getAccountConversationLastStatusIDs(ctx, accountID, page)
if err != nil {
return nil, err
}
return c.getConversationsByLastStatusIDs(ctx, accountID, conversationLastStatusIDs)
}
func (c *conversationDB) getAccountConversationLastStatusIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
return loadPagedIDs(&c.state.Caches.GTS.ConversationLastStatusIDs, accountID, page, func() ([]string, error) {
var conversationLastStatusIDs []string
// Conversation last status IDs not in cache. Perform DB query.
if _, err := c.db.
NewSelect().
Model((*gtsmodel.Conversation)(nil)).
Column("last_status_id").
Where("? = ?", bun.Ident("account_id"), accountID).
OrderExpr("? DESC", bun.Ident("last_status_id")).
Exec(ctx, &conversationLastStatusIDs); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, err
}
return conversationLastStatusIDs, nil
})
}
func (c *conversationDB) getConversationsByLastStatusIDs(
ctx context.Context,
accountID string,
conversationLastStatusIDs []string,
) ([]*gtsmodel.Conversation, error) {
// Load all conversation IDs via cache loader callbacks.
conversations, err := c.state.Caches.GTS.Conversation.LoadIDs2Part(
"AccountID,LastStatusID",
accountID,
conversationLastStatusIDs,
func(accountID string, uncached []string) ([]*gtsmodel.Conversation, error) {
// Preallocate expected length of uncached conversations.
conversations := make([]*gtsmodel.Conversation, 0, len(uncached))
// Perform database query scanning the remaining (uncached) IDs.
if err := c.db.NewSelect().
Model(&conversations).
Where("? = ?", bun.Ident("account_id"), accountID).
Where("? IN (?)", bun.Ident("last_status_id"), bun.In(uncached)).
Scan(ctx); err != nil {
return nil, err
}
return conversations, nil
},
)
if err != nil {
return nil, err
}
// Reorder the conversations by their last status IDs to ensure correct order.
getID := func(b *gtsmodel.Conversation) string { return b.ID }
util.OrderBy(conversations, conversationLastStatusIDs, getID)
if gtscontext.Barebones(ctx) {
// no need to fully populate.
return conversations, nil
}
// Populate all loaded conversations, removing those we fail to populate.
conversations = slices.DeleteFunc(conversations, func(conversation *gtsmodel.Conversation) bool {
if err := c.populateConversation(ctx, conversation); err != nil {
log.Errorf(ctx, "error populating conversation %s: %v", conversation.ID, err)
return true
}
return false
})
return conversations, nil
}
func (c *conversationDB) UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error {
// If we're updating by column, ensure "updated_at" is included.
if len(columns) > 0 {
columns = append(columns, "updated_at")
}
return c.state.Caches.GTS.Conversation.Store(conversation, func() error {
_, err := NewUpsert(c.db).
Model(conversation).
Constraint("id").
Column(columns...).
Exec(ctx)
return err
})
}
func (c *conversationDB) LinkConversationToStatus(ctx context.Context, conversationID string, statusID string) error {
conversationToStatus := &gtsmodel.ConversationToStatus{
ConversationID: conversationID,
StatusID: statusID,
}
if _, err := c.db.NewInsert().
Model(conversationToStatus).
Exec(ctx); // nocollapse
err != nil {
return err
}
return nil
}
func (c *conversationDB) DeleteConversationByID(ctx context.Context, id string) error {
// Load conversation into cache before attempting a delete,
// as we need it cached in order to trigger the invalidate
// callback. This in turn invalidates others.
_, err := c.GetConversationByID(gtscontext.SetBarebones(ctx), id)
if err != nil {
if errors.Is(err, db.ErrNoEntries) {
// not an issue.
err = nil
}
return err
}
// Drop this now-cached conversation on return after delete.
defer c.state.Caches.GTS.Conversation.Invalidate("ID", id)
// Finally delete conversation from DB.
_, err = c.db.NewDelete().
Model((*gtsmodel.Conversation)(nil)).
Where("? = ?", bun.Ident("id"), id).
Exec(ctx)
return err
}
func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error {
defer func() {
// Invalidate any cached conversations and conversation IDs owned by this account on return.
// Conversation invalidate hooks only invalidate the conversation ID cache,
// so we don't need to load all conversations into the cache to run invalidation hooks,
// as with some other object types (blocks, for example).
c.state.Caches.GTS.Conversation.Invalidate("AccountID", accountID)
// In case there were no cached conversations,
// explicitly invalidate the user's conversation last status ID cache.
c.state.Caches.GTS.ConversationLastStatusIDs.Invalidate(accountID)
}()
return c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// Delete conversations matching the account ID.
deletedConversationIDs := []string{}
if err := tx.NewDelete().
Model((*gtsmodel.Conversation)(nil)).
Where("? = ?", bun.Ident("account_id"), accountID).
Returning("?", bun.Ident("id")).
Scan(ctx, &deletedConversationIDs); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversations for account %s: %w", accountID, err)
}
// Delete any conversation-to-status links matching the deleted conversation IDs.
if _, err := tx.NewDelete().
Model((*gtsmodel.ConversationToStatus)(nil)).
Where("? IN (?)", bun.Ident("conversation_id"), bun.In(deletedConversationIDs)).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversation-to-status links for account %s: %w", accountID, err)
}
return nil
})
}
func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error {
// SQL returning the current time.
var nowSQL string
switch c.db.Dialect().Name() {
case dialect.SQLite:
nowSQL = "DATE('now')"
case dialect.PG:
nowSQL = "NOW()"
default:
log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db)
}
updatedConversationIDs := []string{}
deletedConversationIDs := []string{}
if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// Delete this status from conversation-to-status links.
if _, err := tx.NewDelete().
Model((*gtsmodel.ConversationToStatus)(nil)).
Where("? = ?", bun.Ident("status_id"), statusID).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err)
}
// Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here.
// Create a temporary table with all statuses other than the deleted status
// in each conversation for which the deleted status is the last status
// (if there are such statuses).
conversationStatusesTempTable := "conversation_statuses_" + id.NewULID()
if _, err := tx.NewRaw(
"CREATE TEMPORARY TABLE ? AS ?",
bun.Ident(conversationStatusesTempTable),
tx.NewSelect().
ColumnExpr(
"? AS ?",
bun.Ident("conversations.id"),
bun.Ident("conversation_id"),
).
ColumnExpr(
"? AS ?",
bun.Ident("conversation_to_statuses.status_id"),
bun.Ident("id"),
).
Column("statuses.created_at").
Table("conversations").
Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")).
JoinOn(
"? = ?",
bun.Ident("conversations.id"),
bun.Ident("conversation_to_statuses.conversation_id"),
).
JoinOn(
"? != ?",
bun.Ident("conversation_to_statuses.status_id"),
statusID,
).
Join("LEFT JOIN ?", bun.Ident("statuses")).
JoinOn(
"? = ?",
bun.Ident("conversation_to_statuses.status_id"),
bun.Ident("statuses.id"),
).
Where(
"? = ?",
bun.Ident("conversations.last_status_id"),
statusID,
),
).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err)
}
// Create a temporary table with the most recently created status in each conversation
// for which the deleted status is the last status (if there is such a status).
latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID()
if _, err := tx.NewRaw(
"CREATE TEMPORARY TABLE ? AS ?",
bun.Ident(latestConversationStatusesTempTable),
tx.NewSelect().
Column(
"conversation_statuses.conversation_id",
"conversation_statuses.id",
).
TableExpr(
"? AS ?",
bun.Ident(conversationStatusesTempTable),
bun.Ident("conversation_statuses"),
).
Join(
"LEFT JOIN ? AS ?",
bun.Ident(conversationStatusesTempTable),
bun.Ident("later_statuses"),
).
JoinOn(
"? = ?",
bun.Ident("conversation_statuses.conversation_id"),
bun.Ident("later_statuses.conversation_id"),
).
JoinOn(
"? > ?",
bun.Ident("later_statuses.created_at"),
bun.Ident("conversation_statuses.created_at"),
).
Where("? IS NULL", bun.Ident("later_statuses.id")),
).
Exec(ctx); // nocollapse
err != nil {
return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err)
}
// For every conversation where the given status was the last one,
// reset its last status to the most recently created in the conversation other than that one,
// if there is such a status.
// Return conversation IDs for invalidation.
if err := tx.NewUpdate().
Model((*gtsmodel.Conversation)(nil)).
SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")).
SetColumn("updated_at", "?", bun.Safe(nowSQL)).
TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")).
Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")).
Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")).
Returning("?TableName.?", bun.Ident("id")).
Scan(ctx, &updatedConversationIDs); // nocollapse
err != nil {
return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err)
}
// If there is no such status, delete the conversation.
// Return conversation IDs for invalidation.
if err := tx.NewDelete().
Model((*gtsmodel.Conversation)(nil)).
Where(
"? IN (?)",
bun.Ident("id"),
tx.NewSelect().
Table(latestConversationStatusesTempTable).
Column("conversation_id").
Where("? IS NULL", bun.Ident("id")),
).
Returning("?", bun.Ident("id")).
Scan(ctx, &deletedConversationIDs); // nocollapse
err != nil {
return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err)
}
// Clean up.
for _, tempTable := range []string{
conversationStatusesTempTable,
latestConversationStatusesTempTable,
} {
if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil {
return gtserror.Newf(
"error dropping temporary table %s after deleting status %s: %w",
tempTable,
statusID,
err,
)
}
}
return nil
}); err != nil {
return err
}
updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)
c.state.Caches.GTS.Conversation.InvalidateIDs("ID", updatedConversationIDs)
return nil
}

View file

@ -0,0 +1,115 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package bundb_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/db/test"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
type ConversationTestSuite struct {
BunDBStandardTestSuite
cf test.ConversationFactory
// testAccount is the owner of statuses and conversations in these tests (must be local).
testAccount *gtsmodel.Account
// threadID is the thread used for statuses in any given test.
threadID string
}
func (suite *ConversationTestSuite) SetupSuite() {
suite.BunDBStandardTestSuite.SetupSuite()
suite.cf.SetupSuite(suite)
suite.testAccount = suite.testAccounts["local_account_1"]
}
func (suite *ConversationTestSuite) SetupTest() {
suite.BunDBStandardTestSuite.SetupTest()
suite.cf.SetupTest(suite.db)
suite.threadID = suite.cf.NewULID(0)
}
// deleteStatus deletes a status from conversations and ends the test if that fails.
func (suite *ConversationTestSuite) deleteStatus(statusID string) {
err := suite.db.DeleteStatusFromConversations(context.Background(), statusID)
if err != nil {
suite.FailNow(err.Error())
}
}
// getConversation fetches a conversation by ID and ends the test if that fails.
func (suite *ConversationTestSuite) getConversation(conversationID string) *gtsmodel.Conversation {
conversation, err := suite.db.GetConversationByID(context.Background(), conversationID)
if err != nil {
suite.FailNow(err.Error())
}
return conversation
}
// If we delete a status that is in a conversation but not the last status,
// the conversation's last status should not change.
func (suite *ConversationTestSuite) TestDeleteNonLastStatus() {
conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
initial := conversation.LastStatus
reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial)
conversation = suite.cf.SetLastStatus(conversation, reply)
suite.deleteStatus(initial.ID)
conversation = suite.getConversation(conversation.ID)
suite.Equal(reply.ID, conversation.LastStatusID)
}
// If we delete the last status in a conversation that has other statuses,
// a previous status should become the new last status.
func (suite *ConversationTestSuite) TestDeleteLastStatus() {
conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
initial := conversation.LastStatus
reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial)
conversation = suite.cf.SetLastStatus(conversation, reply)
conversation = suite.getConversation(conversation.ID)
suite.deleteStatus(reply.ID)
conversation = suite.getConversation(conversation.ID)
suite.Equal(initial.ID, conversation.LastStatusID)
}
// If we delete the only status in a conversation,
// the conversation should be deleted as well.
func (suite *ConversationTestSuite) TestDeleteOnlyStatus() {
conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
initial := conversation.LastStatus
suite.deleteStatus(initial.ID)
_, err := suite.db.GetConversationByID(context.Background(), conversation.ID)
suite.ErrorIs(err, db.ErrNoEntries)
}
func TestConversationTestSuite(t *testing.T) {
suite.Run(t, new(ConversationTestSuite))
}

View file

@ -0,0 +1,78 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package migrations
import (
"context"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/uptrace/bun"
)
// Note: this migration has an advanced migration followup.
// See Conversations.MigrateDMs().
func init() {
up := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
for _, model := range []interface{}{
&gtsmodel.Conversation{},
&gtsmodel.ConversationToStatus{},
} {
if _, err := tx.
NewCreateTable().
Model(model).
IfNotExists().
Exec(ctx); err != nil {
return err
}
}
// Add indexes to the conversations table.
for index, columns := range map[string][]string{
"conversations_account_id_idx": {
"account_id",
},
"conversations_last_status_id_idx": {
"last_status_id",
},
} {
if _, err := tx.
NewCreateIndex().
Model(&gtsmodel.Conversation{}).
Index(index).
Column(columns...).
IfNotExists().
Exec(ctx); err != nil {
return err
}
}
return nil
})
}
down := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return nil
})
}
if err := Migrations.Register(up, down); err != nil {
panic(err)
}
}

View file

@ -0,0 +1,49 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package migrations
import (
"context"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/uptrace/bun"
)
// Create the advanced migrations table.
func init() {
up := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
_, err := tx.
NewCreateTable().
Model((*gtsmodel.AdvancedMigration)(nil)).
IfNotExists().
Exec(ctx)
return err
})
}
down := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return nil
})
}
if err := Migrations.Register(up, down); err != nil {
panic(err)
}
}

View file

@ -682,3 +682,35 @@ func (s *statusDB) getStatusBoostIDs(ctx context.Context, statusID string) ([]st
return statusIDs, nil return statusIDs, nil
}) })
} }
func (s *statusDB) MaxDirectStatusID(ctx context.Context) (string, error) {
maxID := ""
if err := s.db.
NewSelect().
Model((*gtsmodel.Status)(nil)).
ColumnExpr("COALESCE(MAX(?), '')", bun.Ident("id")).
Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect).
Scan(ctx, &maxID); // nocollapse
err != nil {
return "", err
}
return maxID, nil
}
func (s *statusDB) GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error) {
var statusIDs []string
if err := s.db.
NewSelect().
Model((*gtsmodel.Status)(nil)).
Column("id").
Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect).
Where("? > ?", bun.Ident("id"), minID).
Where("? <= ?", bun.Ident("id"), maxIDInclusive).
Order("id ASC").
Limit(count).
Scan(ctx, &statusIDs); // nocollapse
err != nil {
return nil, err
}
return statusIDs, nil
}

View file

@ -0,0 +1,52 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package db
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/paging"
)
type Conversation interface {
// GetConversationByID gets a single conversation by ID.
GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error)
// GetConversationByThreadAndAccountIDs retrieves a conversation by thread ID and participant account IDs, if it exists.
GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error)
// GetConversationsByOwnerAccountID gets all conversations owned by the given account,
// with optional paging based on last status ID.
GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error)
// UpsertConversation creates or updates a conversation.
UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error
// LinkConversationToStatus creates a conversation-to-status link.
LinkConversationToStatus(ctx context.Context, statusID string, conversationID string) error
// DeleteConversationByID deletes a conversation, removing it from the owning account's conversation list.
DeleteConversationByID(ctx context.Context, id string) error
// DeleteConversationsByOwnerAccountID deletes all conversations owned by the given account.
DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error
// DeleteStatusFromConversations handles when a status is deleted by updating or deleting conversations for which it was the last status.
DeleteStatusFromConversations(ctx context.Context, statusID string) error
}

View file

@ -26,8 +26,10 @@
type DB interface { type DB interface {
Account Account
Admin Admin
AdvancedMigration
Application Application
Basic Basic
Conversation
Domain Domain
Emoji Emoji
HeaderFilter HeaderFilter

View file

@ -78,4 +78,16 @@ type Status interface {
// GetStatusChildren gets the child statuses of a given status. // GetStatusChildren gets the child statuses of a given status.
GetStatusChildren(ctx context.Context, statusID string) ([]*gtsmodel.Status, error) GetStatusChildren(ctx context.Context, statusID string) ([]*gtsmodel.Status, error)
// MaxDirectStatusID returns the newest ID across all DM statuses.
// Returns the empty string with no error if there are no DM statuses yet.
// It is used only by the conversation advanced migration.
MaxDirectStatusID(ctx context.Context) (string, error)
// GetDirectStatusIDsBatch returns up to count DM status IDs strictly greater than minID
// and less than or equal to maxIDInclusive. Note that this is different from most of our paging,
// which uses a maxID and returns IDs strictly less than that, because it's called with the result of
// MaxDirectStatusID, and expects to eventually return the status with that ID.
// It is used only by the conversation advanced migration.
GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error)
} }

View file

@ -0,0 +1,122 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package test
import (
"context"
"crypto/rand"
"time"
"github.com/oklog/ulid"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
type testSuite interface {
FailNow(string, ...interface{}) bool
}
// ConversationFactory can be embedded or included by test suites that want to generate statuses and conversations.
type ConversationFactory struct {
// Test suite, or at least the methods from it that we care about.
suite testSuite
// Test DB.
db db.DB
// TestStart is the timestamp used as a base for timestamps and ULIDs in any given test.
TestStart time.Time
}
// SetupSuite should be called by the SetupSuite of test suites that use this mixin.
func (f *ConversationFactory) SetupSuite(suite testSuite) {
f.suite = suite
}
// SetupTest should be called by the SetupTest of test suites that use this mixin.
func (f *ConversationFactory) SetupTest(db db.DB) {
f.db = db
f.TestStart = time.Now()
}
// NewULID is a version of id.NewULID that uses the test start time and an offset instead of the real time.
func (f *ConversationFactory) NewULID(offset time.Duration) string {
ulid, err := ulid.New(
ulid.Timestamp(f.TestStart.Add(offset)), rand.Reader,
)
if err != nil {
panic(err)
}
return ulid.String()
}
func (f *ConversationFactory) NewTestStatus(localAccount *gtsmodel.Account, threadID string, nowOffset time.Duration, inReplyToStatus *gtsmodel.Status) *gtsmodel.Status {
statusID := f.NewULID(nowOffset)
createdAt := f.TestStart.Add(nowOffset)
status := &gtsmodel.Status{
ID: statusID,
CreatedAt: createdAt,
UpdatedAt: createdAt,
URI: "http://localhost:8080/users/" + localAccount.Username + "/statuses/" + statusID,
AccountID: localAccount.ID,
AccountURI: localAccount.URI,
Local: util.Ptr(true),
ThreadID: threadID,
Visibility: gtsmodel.VisibilityDirect,
ActivityStreamsType: ap.ObjectNote,
Federated: util.Ptr(true),
}
if inReplyToStatus != nil {
status.InReplyToID = inReplyToStatus.ID
status.InReplyToURI = inReplyToStatus.URI
status.InReplyToAccountID = inReplyToStatus.AccountID
}
if err := f.db.PutStatus(context.Background(), status); err != nil {
f.suite.FailNow(err.Error())
}
return status
}
// NewTestConversation creates a new status and adds it to a new unread conversation, returning the conversation.
func (f *ConversationFactory) NewTestConversation(localAccount *gtsmodel.Account, nowOffset time.Duration) *gtsmodel.Conversation {
threadID := f.NewULID(nowOffset)
status := f.NewTestStatus(localAccount, threadID, nowOffset, nil)
conversation := &gtsmodel.Conversation{
ID: f.NewULID(nowOffset),
AccountID: localAccount.ID,
ThreadID: status.ThreadID,
Read: util.Ptr(false),
}
f.SetLastStatus(conversation, status)
return conversation
}
// SetLastStatus sets an already stored status as the last status of a new or already stored conversation,
// and returns the updated conversation.
func (f *ConversationFactory) SetLastStatus(conversation *gtsmodel.Conversation, status *gtsmodel.Status) *gtsmodel.Conversation {
conversation.LastStatusID = status.ID
conversation.LastStatus = status
if err := f.db.UpsertConversation(context.Background(), conversation, "last_status_id"); err != nil {
f.suite.FailNow(err.Error())
}
if err := f.db.LinkConversationToStatus(context.Background(), conversation.ID, status.ID); err != nil {
f.suite.FailNow(err.Error())
}
return conversation
}

View file

@ -0,0 +1,32 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package gtsmodel
import (
"time"
)
// AdvancedMigration stores state for an "advanced migration", which is a migration
// that doesn't fit into the Bun migration framework.
type AdvancedMigration struct {
ID string `bun:",pk,nullzero,notnull,unique"` // id of this migration (preassigned, not a ULID)
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item last updated
StateJSON []byte `bun:",nullzero"` // JSON dump of the migration state
Finished *bool `bun:",nullzero,notnull,default:false"` // has this migration finished?
}

View file

@ -0,0 +1,77 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package gtsmodel
import (
"slices"
"strings"
"time"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// Conversation represents direct messages between the owner account and a set of other accounts.
type Conversation struct {
// ID of this item in the database.
ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"`
// When was this item created?
CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
// When was this item last updated?
UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
// Account that owns the conversation.
AccountID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq,unique:conversations_account_id_last_status_id_uniq"`
Account *Account `bun:"-"`
// Other accounts participating in the conversation.
// Doesn't include the owner. May be empty in the case of a DM to yourself.
OtherAccountIDs []string `bun:"other_account_ids,array"`
OtherAccounts []*Account `bun:"-"`
// Denormalized lookup key derived from unique OtherAccountIDs, sorted and concatenated with commas.
// May be empty in the case of a DM to yourself.
OtherAccountsKey string `bun:",notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"`
// Thread that the conversation is part of.
ThreadID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"`
// ID of the last status in this conversation.
LastStatusID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_account_id_last_status_id_uniq"`
LastStatus *Status `bun:"-"`
// Has the owner read all statuses in this conversation?
Read *bool `bun:",default:false"`
}
// ConversationOtherAccountsKey creates an OtherAccountsKey from a list of OtherAccountIDs.
func ConversationOtherAccountsKey(otherAccountIDs []string) string {
otherAccountIDs = util.UniqueStrings(otherAccountIDs)
slices.Sort(otherAccountIDs)
return strings.Join(otherAccountIDs, ",")
}
// ConversationToStatus is an intermediate struct to facilitate the many2many relationship between a conversation and its statuses,
// including but not limited to the last status. These are used only when deleting a status from a conversation.
type ConversationToStatus struct {
ConversationID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"`
Conversation *Conversation `bun:"rel:belongs-to"`
StatusID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"`
Status *Status `bun:"rel:belongs-to"`
}

View file

@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod
// TODO: add status mutes here when they're implemented. // TODO: add status mutes here when they're implemented.
// Delete all conversations owned by given account.
// Conversations in which it has only participated will be retained;
// they can always be deleted by their owners.
if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error deleting conversations owned by account: %w", err)
}
// Delete all poll votes owned by given account. // Delete all poll votes owned by given account.
if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) { err != nil && !errors.Is(err, db.ErrNoEntries) {

View file

@ -0,0 +1,48 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package advancedmigrations
import (
"context"
"fmt"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
)
// Processor holds references to any other processor that has migrations to run.
type Processor struct {
conversations *conversations.Processor
}
func New(
conversations *conversations.Processor,
) Processor {
return Processor{
conversations: conversations,
}
}
// Migrate runs all advanced migrations.
// Errors should be in the same format thrown by other server or testrig startup failures.
func (p *Processor) Migrate(ctx context.Context) error {
if err := p.conversations.MigrateDMsToConversations(ctx); err != nil {
return fmt.Errorf("error running conversations advanced migration: %w", err)
}
return nil
}

View file

@ -0,0 +1,126 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
"errors"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
)
type Processor struct {
state *state.State
converter *typeutils.Converter
filter *visibility.Filter
}
func New(
state *state.State,
converter *typeutils.Converter,
filter *visibility.Filter,
) Processor {
return Processor{
state: state,
converter: converter,
filter: filter,
}
}
const conversationNotFoundHelpText = "conversation not found"
// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account.
func (p *Processor) getConversationOwnedBy(
ctx context.Context,
id string,
requestingAccount *gtsmodel.Account,
) (*gtsmodel.Conversation, gtserror.WithCode) {
// Get the conversation so that we can check its owning account ID.
conversation, err := p.state.DB.GetConversationByID(ctx, id)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, gtserror.NewErrorInternalError(
gtserror.Newf(
"DB error getting conversation %s for account %s: %w",
id,
requestingAccount.ID,
err,
),
)
}
if conversation == nil {
return nil, gtserror.NewErrorNotFound(
gtserror.Newf(
"conversation %s not found: %w",
id,
err,
),
conversationNotFoundHelpText,
)
}
if conversation.AccountID != requestingAccount.ID {
return nil, gtserror.NewErrorNotFound(
gtserror.Newf(
"conversation %s not owned by account %s: %w",
id,
requestingAccount.ID,
err,
),
conversationNotFoundHelpText,
)
}
return conversation, nil
}
// getFiltersAndMutes gets the given account's filters and compiled mute list.
func (p *Processor) getFiltersAndMutes(
ctx context.Context,
requestingAccount *gtsmodel.Account,
) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) {
filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID)
if err != nil {
return nil, nil, gtserror.NewErrorInternalError(
gtserror.Newf(
"DB error getting filters for account %s: %w",
requestingAccount.ID,
err,
),
)
}
mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil)
if err != nil {
return nil, nil, gtserror.NewErrorInternalError(
gtserror.Newf(
"DB error getting mutes for account %s: %w",
requestingAccount.ID,
err,
),
)
}
compiledMutes := usermute.NewCompiledUserMuteList(mutes)
return filters, compiledMutes, nil
}

View file

@ -0,0 +1,151 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/db"
dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test"
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/testrig"
)
type ConversationsTestSuite struct {
// standard suite interfaces
suite.Suite
db db.DB
tc *typeutils.Converter
storage *storage.Driver
state state.State
mediaManager *media.Manager
transportController transport.Controller
federator *federation.Federator
emailSender email.Sender
sentEmails map[string]string
filter *visibility.Filter
// standard suite models
testTokens map[string]*gtsmodel.Token
testClients map[string]*gtsmodel.Client
testApplications map[string]*gtsmodel.Application
testUsers map[string]*gtsmodel.User
testAccounts map[string]*gtsmodel.Account
testFollows map[string]*gtsmodel.Follow
testAttachments map[string]*gtsmodel.MediaAttachment
testStatuses map[string]*gtsmodel.Status
// module being tested
conversationsProcessor conversations.Processor
// Owner of test conversations
testAccount *gtsmodel.Account
// Mixin for conversation tests
dbtest.ConversationFactory
}
func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) {
ctx := context.Background()
ctx, cncl := context.WithTimeout(ctx, timeout)
defer cncl()
return suite.state.Workers.Client.Queue.PopCtx(ctx)
}
func (suite *ConversationsTestSuite) SetupSuite() {
suite.testTokens = testrig.NewTestTokens()
suite.testClients = testrig.NewTestClients()
suite.testApplications = testrig.NewTestApplications()
suite.testUsers = testrig.NewTestUsers()
suite.testAccounts = testrig.NewTestAccounts()
suite.testFollows = testrig.NewTestFollows()
suite.testAttachments = testrig.NewTestAttachments()
suite.testStatuses = testrig.NewTestStatuses()
suite.ConversationFactory.SetupSuite(suite)
}
func (suite *ConversationsTestSuite) SetupTest() {
suite.state.Caches.Init()
testrig.StartNoopWorkers(&suite.state)
testrig.InitTestConfig()
testrig.InitTestLog()
suite.db = testrig.NewTestDB(&suite.state)
suite.state.DB = suite.db
suite.tc = typeutils.NewConverter(&suite.state)
suite.filter = visibility.NewFilter(&suite.state)
testrig.StartTimelines(
&suite.state,
suite.filter,
suite.tc,
)
suite.storage = testrig.NewInMemoryStorage()
suite.state.Storage = suite.storage
suite.mediaManager = testrig.NewTestMediaManager(&suite.state)
suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media"))
suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager)
suite.sentEmails = make(map[string]string)
suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails)
suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter)
testrig.StandardDBSetup(suite.db, nil)
testrig.StandardStorageSetup(suite.storage, "../../../testrig/media")
suite.ConversationFactory.SetupTest(suite.db)
suite.testAccount = suite.testAccounts["local_account_1"]
}
func (suite *ConversationsTestSuite) TearDownTest() {
conversationModels := []interface{}{
(*gtsmodel.Conversation)(nil),
(*gtsmodel.ConversationToStatus)(nil),
}
for _, model := range conversationModels {
if err := suite.db.DropTable(context.Background(), model); err != nil {
log.Error(context.Background(), err)
}
}
testrig.StandardDBTeardown(suite.db)
testrig.StandardStorageTeardown(suite.storage)
testrig.StopWorkers(&suite.state)
}
func TestConversationsTestSuite(t *testing.T) {
suite.Run(t, new(ConversationsTestSuite))
}

View file

@ -0,0 +1,45 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
func (p *Processor) Delete(
ctx context.Context,
requestingAccount *gtsmodel.Account,
id string,
) gtserror.WithCode {
// Get the conversation so that we can check its owning account ID.
conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount)
if errWithCode != nil {
return errWithCode
}
// Delete the conversation.
if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil {
return gtserror.NewErrorInternalError(err)
}
return nil
}

View file

@ -0,0 +1,27 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import "context"
func (suite *ConversationsTestSuite) TestDelete() {
conversation := suite.NewTestConversation(suite.testAccount, 0)
err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID)
suite.NoError(err)
}

View file

@ -0,0 +1,101 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
"errors"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/paging"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// GetAll returns conversations owned by the given account.
// The additional parameters can be used for paging.
func (p *Processor) GetAll(
ctx context.Context,
requestingAccount *gtsmodel.Account,
page *paging.Page,
) (*apimodel.PageableResponse, gtserror.WithCode) {
conversations, err := p.state.DB.GetConversationsByOwnerAccountID(
ctx,
requestingAccount.ID,
page,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, gtserror.NewErrorInternalError(
gtserror.Newf(
"DB error getting conversations for account %s: %w",
requestingAccount.ID,
err,
),
)
}
// Check for empty response.
count := len(conversations)
if len(conversations) == 0 {
return util.EmptyPageableResponse(), nil
}
// Get the lowest and highest last status ID values, used for paging.
lo := conversations[count-1].LastStatusID
hi := conversations[0].LastStatusID
items := make([]interface{}, 0, count)
filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
if errWithCode != nil {
return nil, errWithCode
}
for _, conversation := range conversations {
// Convert conversation to frontend API model.
apiConversation, err := p.converter.ConversationToAPIConversation(
ctx,
conversation,
requestingAccount,
filters,
mutes,
)
if err != nil {
log.Errorf(
ctx,
"error converting conversation %s to API representation: %v",
conversation.ID,
err,
)
continue
}
// Append conversation to return items.
items = append(items, apiConversation)
}
return paging.PackageResponse(paging.ResponseParams{
Items: items,
Path: "/api/v1/conversations",
Next: page.Next(lo, hi),
Prev: page.Prev(lo, hi),
}), nil
}

View file

@ -0,0 +1,65 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import (
"context"
"time"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
)
func (suite *ConversationsTestSuite) TestGetAll() {
conversation := suite.NewTestConversation(suite.testAccount, 0)
resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) {
apiConversation := resp.Items[0].(*apimodel.Conversation)
suite.Equal(conversation.ID, apiConversation.ID)
suite.True(apiConversation.Unread)
}
}
// Test that conversations with newer last status IDs are returned earlier.
func (suite *ConversationsTestSuite) TestGetAllOrder() {
// Create a new conversation.
conversation1 := suite.NewTestConversation(suite.testAccount, 0)
// Create another new conversation with a last status newer than conversation1's.
conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second)
// Add an even newer status than that to conversation1.
conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus)
conversation1.LastStatusID = conversation1Status2.ID
if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil {
suite.FailNow(err.Error())
}
resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
if suite.NoError(err) && suite.Len(resp.Items, 2) {
// conversation1 should be the first conversation returned.
apiConversation1 := resp.Items[0].(*apimodel.Conversation)
suite.Equal(conversation1.ID, apiConversation1.ID)
// It should have the newest status added to it.
suite.Equal(conversation1.LastStatusID, conversation1Status2.ID)
// conversation2 should be the second conversation returned.
apiConversation2 := resp.Items[1].(*apimodel.Conversation)
suite.Equal(conversation2.ID, apiConversation2.ID)
}
}

View file

@ -0,0 +1,131 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
"encoding/json"
"errors"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
const advancedMigrationID = "20240611190733_add_conversations"
const statusBatchSize = 100
type AdvancedMigrationState struct {
MinID string
MaxIDInclusive string
}
func (p *Processor) MigrateDMsToConversations(ctx context.Context) error {
advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err)
}
state := AdvancedMigrationState{}
if advancedMigration != nil {
// There was a previous migration.
if *advancedMigration.Finished {
// This migration has already been run to completion; we don't need to run it again.
return nil
}
// Otherwise, pick up where we left off.
if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil {
// This should never happen.
return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err)
}
} else {
// Start at the beginning.
state.MinID = id.Lowest
// Find the max ID of all existing statuses.
// This will be the last one we migrate;
// newer ones will be handled by the normal conversation flow.
state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx)
if err != nil {
return gtserror.Newf("couldn't get max DM status ID for migration: %w", err)
}
// Save a new advanced migration record.
advancedMigration = &gtsmodel.AdvancedMigration{
ID: advancedMigrationID,
Finished: util.Ptr(false),
}
if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
// This should never happen.
return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
}
if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
}
}
log.Info(ctx, "migrating DMs to conversations…")
// In batches, get all statuses up to and including the max ID,
// and update conversations for each in order.
for {
// Get status IDs for this batch.
statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize)
if err != nil {
return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err)
}
if len(statusIDs) == 0 {
break
}
log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID)
// Load the batch by IDs.
statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs)
if err != nil {
return gtserror.Newf("couldn't get DM statuses for migration: %w", err)
}
// Update conversations for each status. Don't generate notifications.
for _, status := range statuses {
if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil {
return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err)
}
}
// Save the migration state with the new min ID.
state.MinID = statusIDs[len(statusIDs)-1]
if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
// This should never happen.
return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
}
if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
}
}
// Mark the migration as finished.
advancedMigration.Finished = util.Ptr(true)
if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
}
log.Info(ctx, "finished migrating DMs to conversations.")
return nil
}

View file

@ -0,0 +1,85 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/db/bundb"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
// Test that we can migrate DMs to conversations.
// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs.
func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() {
advancedMigrationID := "20240611190733_add_conversations"
ctx := context.Background()
rawDB := (suite.db).(*bundb.DBService).DB()
// Precondition: we shouldn't have any conversations yet.
numConversations := 0
if err := rawDB.NewSelect().
Model((*gtsmodel.Conversation)(nil)).
ColumnExpr("COUNT(*)").
Scan(ctx, &numConversations); // nocollapse
err != nil {
suite.FailNow(err.Error())
}
suite.Zero(numConversations)
// Precondition: there is no record of the conversations advanced migration.
_, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
suite.ErrorIs(err, db.ErrNoEntries)
// Run the migration, which should not fail.
if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
suite.FailNow(err.Error())
}
// We should now have some conversations.
if err := rawDB.NewSelect().
Model((*gtsmodel.Conversation)(nil)).
ColumnExpr("COUNT(*)").
Scan(ctx, &numConversations); // nocollapse
err != nil {
suite.FailNow(err.Error())
}
suite.NotZero(numConversations)
// The advanced migration should now be marked as finished.
advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
if err != nil {
suite.FailNow(err.Error())
}
if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) {
suite.True(*advancedMigration.Finished)
}
// Run the migration again, which should not fail.
if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
suite.FailNow(err.Error())
}
// However, it shouldn't have done anything, so the advanced migration should not have been updated.
advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
if err != nil {
suite.FailNow(err.Error())
}
suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt)
}

View file

@ -0,0 +1,65 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
func (p *Processor) Read(
ctx context.Context,
requestingAccount *gtsmodel.Account,
id string,
) (*apimodel.Conversation, gtserror.WithCode) {
// Get the conversation, including participating accounts and last status.
conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount)
if errWithCode != nil {
return nil, errWithCode
}
// Mark the conversation as read.
conversation.Read = util.Ptr(true)
if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil {
err = gtserror.Newf("DB error updating conversation %s: %w", id, err)
return nil, gtserror.NewErrorInternalError(err)
}
filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
if errWithCode != nil {
return nil, errWithCode
}
apiConversation, err := p.converter.ConversationToAPIConversation(
ctx,
conversation,
requestingAccount,
filters,
mutes,
)
if err != nil {
err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err)
return nil, gtserror.NewErrorInternalError(err)
}
return apiConversation, nil
}

View file

@ -0,0 +1,34 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import (
"context"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
func (suite *ConversationsTestSuite) TestRead() {
conversation := suite.NewTestConversation(suite.testAccount, 0)
suite.False(util.PtrOrValue(conversation.Read, false))
apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID)
if suite.NoError(err) {
suite.False(apiConversation.Unread)
}
}

View file

@ -0,0 +1,242 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations
import (
"context"
"errors"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
// ConversationNotification carries the arguments to processing/stream.Processor.Conversation.
type ConversationNotification struct {
// AccountID of a local account to deliver the notification to.
AccountID string
// Conversation as the notification payload.
Conversation *apimodel.Conversation
}
// UpdateConversationsForStatus updates all conversations related to a status,
// and returns a map from local account IDs to conversation notifications that should be sent to them.
func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) {
if status.Visibility != gtsmodel.VisibilityDirect {
// Only DMs are considered part of conversations.
return nil, nil
}
if status.BoostOfID != "" {
// Boosts can't be part of conversations.
// FUTURE: This may change if we ever implement quote posts.
return nil, nil
}
if status.ThreadID == "" {
// If the status doesn't have a thread ID, it didn't mention a local account,
// and thus can't be part of a conversation.
return nil, nil
}
// We need accounts to be populated for this.
if err := p.state.DB.PopulateStatus(ctx, status); err != nil {
return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err)
}
// The account which authored the status plus all mentioned accounts.
allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions))
allParticipantsSet[status.AccountID] = status.Account
for _, mention := range status.Mentions {
allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount
}
// Create or update conversations for and send notifications to each local participant.
notifications := make([]ConversationNotification, 0, len(allParticipantsSet))
for _, participant := range allParticipantsSet {
if participant.IsRemote() {
continue
}
localAccount := participant
// If the status is not visible to this account, skip processing it for this account.
visible, err := p.filter.StatusVisible(ctx, localAccount, status)
if err != nil {
log.Errorf(
ctx,
"error checking status %s visibility for account %s: %v",
status.ID,
localAccount.ID,
err,
)
continue
} else if !visible {
continue
}
// Is the status filtered or muted for this user?
// Converting the status to an API status runs the filter/mute checks.
filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount)
if errWithCode != nil {
log.Error(ctx, errWithCode)
continue
}
_, err = p.converter.StatusToAPIStatus(
ctx,
status,
localAccount,
statusfilter.FilterContextNotifications,
filters,
mutes,
)
if err != nil {
// If the status matched a hide filter, skip processing it for this account.
// If there was another kind of error, log that and skip it anyway.
if !errors.Is(err, statusfilter.ErrHideStatus) {
log.Errorf(
ctx,
"error checking status %s filtering/muting for account %s: %v",
status.ID,
localAccount.ID,
err,
)
}
continue
}
// Collect other accounts participating in the conversation.
otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1)
otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1)
for accountID, account := range allParticipantsSet {
if accountID != localAccount.ID {
otherAccounts = append(otherAccounts, account)
otherAccountIDs = append(otherAccountIDs, accountID)
}
}
// Check for a previously existing conversation, if there is one.
conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs(
ctx,
status.ThreadID,
localAccount.ID,
otherAccountIDs,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
log.Errorf(
ctx,
"error trying to find a previous conversation for status %s and account %s: %v",
status.ID,
localAccount.ID,
err,
)
continue
}
if conversation == nil {
// Create a new conversation.
conversation = &gtsmodel.Conversation{
ID: id.NewULID(),
AccountID: localAccount.ID,
OtherAccountIDs: otherAccountIDs,
OtherAccounts: otherAccounts,
OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs),
ThreadID: status.ThreadID,
Read: util.Ptr(true),
}
}
// Assume that if the conversation owner posted the status, they've already read it.
statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID
// Update the conversation.
// If there is no previous last status or this one is more recently created, set it as the last status.
if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) {
conversation.LastStatusID = status.ID
conversation.LastStatus = status
}
// If the conversation is unread, leave it marked as unread.
// If the conversation is read but this status might not have been, mark the conversation as unread.
if !statusAuthoredByConversationOwner {
conversation.Read = util.Ptr(false)
}
// Create or update the conversation.
err = p.state.DB.UpsertConversation(ctx, conversation)
if err != nil {
log.Errorf(
ctx,
"error creating or updating conversation %s for status %s and account %s: %v",
conversation.ID,
status.ID,
localAccount.ID,
err,
)
continue
}
// Link the conversation to the status.
if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil {
log.Errorf(
ctx,
"error linking conversation %s to status %s: %v",
conversation.ID,
status.ID,
err,
)
continue
}
// Convert the conversation to API representation.
apiConversation, err := p.converter.ConversationToAPIConversation(
ctx,
conversation,
localAccount,
filters,
mutes,
)
if err != nil {
// If the conversation's last status matched a hide filter, skip it.
// If there was another kind of error, log that and skip it anyway.
if !errors.Is(err, statusfilter.ErrHideStatus) {
log.Errorf(
ctx,
"error converting conversation %s to API representation for account %s: %v",
status.ID,
localAccount.ID,
err,
)
}
continue
}
// Generate a notification,
// unless the status was authored by the user who would be notified,
// in which case they already know.
if status.AccountID != localAccount.ID {
notifications = append(notifications, ConversationNotification{
AccountID: localAccount.ID,
Conversation: apiConversation,
})
}
}
return notifications, nil
}

View file

@ -0,0 +1,54 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package conversations_test
import (
"context"
)
// Test that we can create conversations when a new status comes in.
func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() {
ctx := context.Background()
// Precondition: the test user shouldn't have any conversations yet.
conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
if err != nil {
suite.FailNow(err.Error())
}
suite.Empty(conversations)
// Create a status.
threadID := suite.NewULID(0)
status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil)
// Update conversations for it.
notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status)
if err != nil {
suite.FailNow(err.Error())
}
// In this test, the user is DMing themself, and should not receive a notification from that.
suite.Empty(notifications)
// The test user should have a conversation now.
conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
if err != nil {
suite.FailNow(err.Error())
}
suite.NotEmpty(conversations)
}

View file

@ -27,7 +27,9 @@
"github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin" "github.com/superseriousbusiness/gotosocial/internal/processing/admin"
"github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations"
"github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/processing/common"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/fedi" "github.com/superseriousbusiness/gotosocial/internal/processing/fedi"
filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1" filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1"
filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2" filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2"
@ -70,22 +72,24 @@ type Processor struct {
SUB-PROCESSORS SUB-PROCESSORS
*/ */
account account.Processor account account.Processor
admin admin.Processor admin admin.Processor
fedi fedi.Processor advancedmigrations advancedmigrations.Processor
filtersv1 filtersv1.Processor conversations conversations.Processor
filtersv2 filtersv2.Processor fedi fedi.Processor
list list.Processor filtersv1 filtersv1.Processor
markers markers.Processor filtersv2 filtersv2.Processor
media media.Processor list list.Processor
polls polls.Processor markers markers.Processor
report report.Processor media media.Processor
search search.Processor polls polls.Processor
status status.Processor report report.Processor
stream stream.Processor search search.Processor
timeline timeline.Processor status status.Processor
user user.Processor stream stream.Processor
workers workers.Processor timeline timeline.Processor
user user.Processor
workers workers.Processor
} }
func (p *Processor) Account() *account.Processor { func (p *Processor) Account() *account.Processor {
@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor {
return &p.admin return &p.admin
} }
func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor {
return &p.advancedmigrations
}
func (p *Processor) Conversations() *conversations.Processor {
return &p.conversations
}
func (p *Processor) Fedi() *fedi.Processor { func (p *Processor) Fedi() *fedi.Processor {
return &p.fedi return &p.fedi
} }
@ -188,6 +200,7 @@ func NewProcessor(
// processors + pin them to this struct. // processors + pin them to this struct.
processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc) processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc)
processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender) processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender)
processor.conversations = conversations.New(state, converter, filter)
processor.fedi = fedi.New(state, &common, converter, federator, filter) processor.fedi = fedi.New(state, &common, converter, federator, filter)
processor.filtersv1 = filtersv1.New(state, converter, &processor.stream) processor.filtersv1 = filtersv1.New(state, converter, &processor.stream)
processor.filtersv2 = filtersv2.New(state, converter, &processor.stream) processor.filtersv2 = filtersv2.New(state, converter, &processor.stream)
@ -200,6 +213,9 @@ func NewProcessor(
processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc) processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc)
processor.user = user.New(state, converter, oauthServer, emailSender) processor.user = user.New(state, converter, oauthServer, emailSender)
// The advanced migrations processor sequences advanced migrations from all other processors.
processor.advancedmigrations = advancedmigrations.New(&processor.conversations)
// Workers processor handles asynchronous // Workers processor handles asynchronous
// worker jobs; instantiate it separately // worker jobs; instantiate it separately
// and pass subset of sub processors it needs. // and pass subset of sub processors it needs.
@ -212,6 +228,7 @@ func NewProcessor(
&processor.account, &processor.account,
&processor.media, &processor.media,
&processor.stream, &processor.stream,
&processor.conversations,
) )
return processor return processor

View file

@ -0,0 +1,44 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package stream
import (
"context"
"encoding/json"
"codeberg.org/gruf/go-byteutil"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/stream"
)
// Conversation streams the given conversation to any open, appropriate streams belonging to the given account.
func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) {
b, err := json.Marshal(conversation)
if err != nil {
log.Errorf(ctx, "error marshaling json: %v", err)
return
}
p.streams.Post(ctx, accountID, stream.Message{
Payload: byteutil.B2S(b),
Event: stream.EventTypeConversation,
Stream: []string{
stream.TimelineDirect,
},
})
}

View file

@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus(
visibility gtsmodel.Visibility, visibility gtsmodel.Visibility,
replyToStatus *gtsmodel.Status, replyToStatus *gtsmodel.Status,
boostOfStatus *gtsmodel.Status, boostOfStatus *gtsmodel.Status,
mentionedAccounts []*gtsmodel.Account,
createThread bool,
) *gtsmodel.Status { ) *gtsmodel.Status {
var ( var (
protocol = config.GetProtocol() protocol = config.GetProtocol()
@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus(
newStatus.Visibility = boostOfStatus.Visibility newStatus.Visibility = boostOfStatus.Visibility
} }
for _, mentionedAccount := range mentionedAccounts {
newMention := &gtsmodel.Mention{
ID: id.NewULID(),
StatusID: newStatus.ID,
Status: newStatus,
OriginAccountID: account.ID,
OriginAccountURI: account.URI,
OriginAccount: account,
TargetAccountID: mentionedAccount.ID,
TargetAccount: mentionedAccount,
Silent: util.Ptr(false),
}
newStatus.Mentions = append(newStatus.Mentions, newMention)
newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID)
if err := state.DB.PutMention(ctx, newMention); err != nil {
suite.FailNow(err.Error())
}
}
if createThread {
newThread := &gtsmodel.Thread{
ID: id.NewULID(),
}
newStatus.ThreadID = newThread.ID
if err := state.DB.PutThread(ctx, newThread); err != nil {
suite.FailNow(err.Error())
}
}
// Put the status in the db, to mimic what would // Put the status in the db, to mimic what would
// have already happened earlier up the flow. // have already happened earlier up the flow.
if err := state.DB.PutStatus(ctx, newStatus); err != nil { if err := state.DB.PutStatus(ctx, newStatus); err != nil {
@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON(
return string(statusJSON) return string(statusJSON)
} }
func (suite *FromClientAPITestSuite) conversationJSON(
ctx context.Context,
typeConverter *typeutils.Converter,
conversation *gtsmodel.Conversation,
requestingAccount *gtsmodel.Account,
) string {
apiConversation, err := typeConverter.ConversationToAPIConversation(
ctx,
conversation,
requestingAccount,
nil,
nil,
)
if err != nil {
suite.FailNow(err.Error())
}
conversationJSON, err := json.Marshal(apiConversation)
if err != nil {
suite.FailNow(err.Error())
}
return string(conversationJSON)
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
testStructs := suite.SetupTestStructs() testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs) defer suite.TearDownTestStructs(testStructs)
@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
nil, nil,
nil, nil,
nil,
false,
) )
) )
@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil, nil,
nil,
false,
) )
) )
@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_1_status_1"], suite.testStatuses["local_account_1_status_1"],
nil, nil,
nil,
false,
) )
threadMute = &gtsmodel.ThreadMute{ threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE", ID: "01HD3KRMBB1M85QRWHD912QWRE",
@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
nil, nil,
suite.testStatuses["local_account_1_status_1"], suite.testStatuses["local_account_1_status_1"],
nil,
false,
) )
threadMute = &gtsmodel.ThreadMute{ threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE", ID: "01HD3KRMBB1M85QRWHD912QWRE",
@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil, nil,
nil,
false,
) )
) )
@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil, nil,
nil,
false,
) )
) )
@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil, nil,
nil,
false,
) )
) )
@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
nil, nil,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil,
false,
) )
) )
@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
gtsmodel.VisibilityPublic, gtsmodel.VisibilityPublic,
nil, nil,
suite.testStatuses["local_account_2_status_1"], suite.testStatuses["local_account_2_status_1"],
nil,
false,
) )
) )
@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
) )
} }
// A DM to a local user should create a conversation and accompanying notification.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
postingAccount = suite.testAccounts["local_account_2"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx,
testStructs.Processor,
receivingAccount,
nil,
)
homeStream = streams[stream.TimelineHome]
directStream = streams[stream.TimelineDirect]
// turtle posts a new top-level DM mentioning zork.
status = suite.newStatus(
ctx,
testStructs.State,
postingAccount,
gtsmodel.VisibilityDirect,
nil,
nil,
[]*gtsmodel.Account{receivingAccount},
true,
)
)
// Process the new status.
if err := testStructs.Processor.Workers().ProcessFromClientAPI(
ctx,
&messages.FromClientAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
GTSModel: status,
Origin: postingAccount,
},
); err != nil {
suite.FailNow(err.Error())
}
// Locate the conversation which should now exist for zork.
conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs(
ctx,
status.ThreadID,
receivingAccount.ID,
[]string{postingAccount.ID},
)
if err != nil {
suite.FailNow(err.Error())
}
// Check status in home stream.
suite.checkStreamed(
homeStream,
true,
"",
stream.EventTypeUpdate,
)
// Check mention notification in home stream.
suite.checkStreamed(
homeStream,
true,
"",
stream.EventTypeNotification,
)
// Check conversation in direct stream.
conversationJSON := suite.conversationJSON(
ctx,
testStructs.TypeConverter,
conversation,
receivingAccount,
)
suite.checkStreamed(
directStream,
true,
conversationJSON,
stream.EventTypeConversation,
)
}
// A public message to a local user should not result in a conversation notification.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
postingAccount = suite.testAccounts["local_account_2"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx,
testStructs.Processor,
receivingAccount,
nil,
)
homeStream = streams[stream.TimelineHome]
directStream = streams[stream.TimelineDirect]
// turtle posts a new top-level public message mentioning zork.
status = suite.newStatus(
ctx,
testStructs.State,
postingAccount,
gtsmodel.VisibilityPublic,
nil,
nil,
[]*gtsmodel.Account{receivingAccount},
true,
)
)
// Process the new status.
if err := testStructs.Processor.Workers().ProcessFromClientAPI(
ctx,
&messages.FromClientAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
GTSModel: status,
Origin: postingAccount,
},
); err != nil {
suite.FailNow(err.Error())
}
// Check status in home stream.
suite.checkStreamed(
homeStream,
true,
"",
stream.EventTypeUpdate,
)
// Check mention notification in home stream.
suite.checkStreamed(
homeStream,
true,
"",
stream.EventTypeNotification,
)
// Check for absence of conversation notification in direct stream.
suite.checkStreamed(
directStream,
false,
"",
"",
)
}
func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
testStructs := suite.SetupTestStructs() testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs) defer suite.TearDownTestStructs(testStructs)

View file

@ -20,6 +20,7 @@
import ( import (
"github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/typeutils"
@ -32,9 +33,10 @@
// - sending a notification to a user // - sending a notification to a user
// - sending an email // - sending an email
type Surface struct { type Surface struct {
State *state.State State *state.State
Converter *typeutils.Converter Converter *typeutils.Converter
Stream *stream.Processor Stream *stream.Processor
Filter *visibility.Filter Filter *visibility.Filter
EmailSender email.Sender EmailSender email.Sender
Conversations *conversations.Processor
} }

View file

@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {
defer suite.TearDownTestStructs(testStructs) defer suite.TearDownTestStructs(testStructs)
surface := &workers.Surface{ surface := &workers.Surface{
State: testStructs.State, State: testStructs.State,
Converter: testStructs.TypeConverter, Converter: testStructs.TypeConverter,
Stream: testStructs.Processor.Stream(), Stream: testStructs.Processor.Stream(),
Filter: visibility.NewFilter(testStructs.State), Filter: visibility.NewFilter(testStructs.State),
EmailSender: testStructs.EmailSender, EmailSender: testStructs.EmailSender,
Conversations: testStructs.Processor.Conversations(),
} }
var ( var (

View file

@ -36,8 +36,8 @@
// and LIST timelines of accounts that follow the status author. // and LIST timelines of accounts that follow the status author.
// //
// It will also handle notifications for any mentions attached to // It will also handle notifications for any mentions attached to
// the account, and notifications for any local accounts that want // the account, notifications for any local accounts that want
// to know when this account posts. // to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc. // Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil { if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
} }
// Update any conversations containing this status, and send conversation notifications.
notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
if err != nil {
return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
}
for _, notification := range notifications {
s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
}
return nil return nil
} }

View file

@ -137,6 +137,11 @@ func (u *utils) wipeStatus(
errs.Appendf("error deleting status from timelines: %w", err) errs.Appendf("error deleting status from timelines: %w", err)
} }
// delete this status from any conversations that it's part of
if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil {
errs.Appendf("error deleting status from conversations: %w", err)
}
// finally, delete the status itself // finally, delete the status itself
if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil { if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {
errs.Appendf("error deleting status: %w", err) errs.Appendf("error deleting status: %w", err)

View file

@ -22,6 +22,7 @@
"github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/state"
@ -44,6 +45,7 @@ func New(
account *account.Processor, account *account.Processor,
media *media.Processor, media *media.Processor,
stream *stream.Processor, stream *stream.Processor,
conversations *conversations.Processor,
) Processor { ) Processor {
// Init federate logic // Init federate logic
// wrapper struct. // wrapper struct.
@ -56,11 +58,12 @@ func New(
// Init surface logic // Init surface logic
// wrapper struct. // wrapper struct.
surface := &Surface{ surface := &Surface{
State: state, State: state,
Converter: converter, Converter: converter,
Stream: stream, Stream: stream,
Filter: filter, Filter: filter,
EmailSender: emailSender, EmailSender: emailSender,
Conversations: conversations,
} }
// Init shared util funcs. // Init shared util funcs.

View file

@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce
stream.TimelineHome, stream.TimelineHome,
stream.TimelinePublic, stream.TimelinePublic,
stream.TimelineNotifications, stream.TimelineNotifications,
stream.TimelineDirect,
} { } {
stream, err := processor.Stream().Open(ctx, account, streamType) stream, err := processor.Stream().Open(ctx, account, streamType)
if err != nil { if err != nil {

View file

@ -46,6 +46,10 @@
// EventTypeFiltersChanged -- the user's filters // EventTypeFiltersChanged -- the user's filters
// (including keywords and statuses) have changed. // (including keywords and statuses) have changed.
EventTypeFiltersChanged = "filters_changed" EventTypeFiltersChanged = "filters_changed"
// EventTypeConversation -- a user
// should be shown an updated conversation.
EventTypeConversation = "conversation"
) )
const ( const (

View file

@ -1739,6 +1739,67 @@ func (c *Converter) NotificationToAPINotification(
}, nil }, nil
} }
// ConversationToAPIConversation converts a conversation into its API representation.
// The conversation status will be filtered using the notification filter context,
// and may be nil if the status was hidden.
func (c *Converter) ConversationToAPIConversation(
ctx context.Context,
conversation *gtsmodel.Conversation,
requestingAccount *gtsmodel.Account,
filters []*gtsmodel.Filter,
mutes *usermute.CompiledUserMuteList,
) (*apimodel.Conversation, error) {
apiConversation := &apimodel.Conversation{
ID: conversation.ID,
Unread: !*conversation.Read,
}
for _, account := range conversation.OtherAccounts {
var apiAccount *apimodel.Account
blocked, err := c.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, account.ID)
if err != nil {
return nil, gtserror.Newf(
"DB error checking blocks between accounts %s and %s: %w",
requestingAccount.ID,
account.ID,
err,
)
}
if blocked || account.IsSuspended() {
apiAccount, err = c.AccountToAPIAccountBlocked(ctx, account)
} else {
apiAccount, err = c.AccountToAPIAccountPublic(ctx, account)
}
if err != nil {
return nil, gtserror.Newf(
"error converting account %s to API representation: %w",
account.ID,
err,
)
}
apiConversation.Accounts = append(apiConversation.Accounts, *apiAccount)
}
if conversation.LastStatus != nil {
var err error
apiConversation.LastStatus, err = c.StatusToAPIStatus(
ctx,
conversation.LastStatus,
requestingAccount,
statusfilter.FilterContextNotifications,
filters,
mutes,
)
if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) {
return nil, gtserror.Newf(
"error converting status %s to API representation: %w",
conversation.LastStatus.ID,
err,
)
}
}
return apiConversation, nil
}
// DomainPermToAPIDomainPerm converts a gts model domin block or allow into an api domain permission. // DomainPermToAPIDomainPerm converts a gts model domin block or allow into an api domain permission.
func (c *Converter) DomainPermToAPIDomainPerm( func (c *Converter) DomainPermToAPIDomainPerm(
ctx context.Context, ctx context.Context,

View file

@ -32,6 +32,8 @@ EXPECT=$(cat << "EOF"
"block-mem-ratio": 2, "block-mem-ratio": 2,
"boost-of-ids-mem-ratio": 3, "boost-of-ids-mem-ratio": 3,
"client-mem-ratio": 0.1, "client-mem-ratio": 0.1,
"conversation-last-status-ids-mem-ratio": 2,
"conversation-mem-ratio": 1,
"emoji-category-mem-ratio": 0.1, "emoji-category-mem-ratio": 0.1,
"emoji-mem-ratio": 3, "emoji-mem-ratio": 3,
"filter-keyword-mem-ratio": 0.5, "filter-keyword-mem-ratio": 0.5,