[chore] small changes missed in previous dereferencer.GetAccount() PRs (#1467)

* small formatting changes, rewrite fetchRemoteMedia to use separate funcs + use mutex lock correctly

* move url parsing before acquiring mutex locks

* use wrapped mutexes to allow safe unlocking. (previously i did a fucky and passed mutex by value...)

* remove unused code

* use consistent map keying for dereferencing headers/avatars

---------

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2023-02-10 20:15:23 +00:00 committed by GitHub
parent e5e257c259
commit 6ac1dda96f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 152 additions and 124 deletions

View file

@ -146,6 +146,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
} }
} }
// Pre-fetch a transport for requesting username, used by later deref procedures.
transport, err := d.transportController.NewTransportForUsername(ctx, requestUser) transport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil { if err != nil {
return nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err) return nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err)
@ -163,19 +164,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
if err == nil { if err == nil {
if account.Domain != accDomain { if account.Domain != accDomain {
// We have the correct accountDomain now; if it was different from // After webfinger, we now have correct account domain from which we can do a final DB check.
// the account domain we were provided, do another db lookup to check
// if we already had the account in the db under the account domain we
// just discovered, otherwise we risk thinking this is a new account
// and trying to put it into the database again (which will cause issues).
alreadyAccount, err := d.db.GetAccountByUsernameDomain(ctx, account.Username, accDomain) alreadyAccount, err := d.db.GetAccountByUsernameDomain(ctx, account.Username, accDomain)
if err != nil && !errors.Is(err, db.ErrNoEntries) { if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err) return nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err)
} }
if err == nil { if err == nil {
// We already had the account in the database; // Enrich existing account.
// continue by enriching that one instead.
account = alreadyAccount account = alreadyAccount
} }
} }
@ -197,14 +193,14 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
} }
} }
// Check whether this account URI is a blocked domain / subdomain // Check whether this account URI is a blocked domain / subdomain.
if blocked, err := d.db.IsDomainBlocked(ctx, uri.Host); err != nil { if blocked, err := d.db.IsDomainBlocked(ctx, uri.Host); err != nil {
return nil, newErrDB(fmt.Errorf("enrichAccount: error checking blocked domain: %w", err)) return nil, newErrDB(fmt.Errorf("enrichAccount: error checking blocked domain: %w", err))
} else if blocked { } else if blocked {
return nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host) return nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host)
} }
// Mark deref+update handshake start // Mark deref+update handshake start.
d.startHandshake(requestUser, uri) d.startHandshake(requestUser, uri)
defer d.stopHandshake(requestUser, uri) defer d.stopHandshake(requestUser, uri)
@ -225,7 +221,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
if account.Username == "" { if account.Username == "" {
// No username was provided, so no webfinger was attempted earlier. // No username was provided, so no webfinger was attempted earlier.
// //
// Now we have a username we can attempt it now, this ensures up-to-date accountdomain info. // Now we have a username we can attempt it, this ensures up-to-date accountdomain info.
accDomain, _, err := d.fingerRemoteAccount(ctx, transport, latestAcc.Username, uri.Host) accDomain, _, err := d.fingerRemoteAccount(ctx, transport, latestAcc.Username, uri.Host)
if err == nil { if err == nil {
@ -238,32 +234,32 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
latestAcc.ID = account.ID latestAcc.ID = account.ID
latestAcc.FetchedAt = time.Now() latestAcc.FetchedAt = time.Now()
// Fetch latest account avatar only if remote URI has changed // Use the existing account media attachments by default.
if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL { latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID
d.dereferencingAvatarsLock.Lock() latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID
newAvatarID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.AvatarRemoteURL, latestAcc.ID, d.dereferencingAvatars, true, false)
d.dereferencingAvatarsLock.Unlock() if latestAcc.AvatarRemoteURL != account.AvatarRemoteURL && latestAcc.AvatarRemoteURL != "" {
// Account avatar URL has changed; fetch up-to-date copy and use new media ID.
latestAcc.AvatarMediaAttachmentID, err = d.fetchRemoteAccountAvatar(ctx,
transport,
latestAcc.AvatarRemoteURL,
latestAcc.ID,
)
if err != nil { if err != nil {
log.Errorf("error fetching remote avatar for account %s: %v", uri, err) log.Errorf("error fetching remote avatar for account %s: %v", uri, err)
} else {
latestAcc.AvatarMediaAttachmentID = newAvatarID
} }
} else {
latestAcc.AvatarMediaAttachmentID = account.AvatarMediaAttachmentID // no change / empty url
} }
// Fetch latest account header only if remote URI has changed if latestAcc.HeaderRemoteURL != account.HeaderRemoteURL && latestAcc.HeaderRemoteURL != "" {
if latestAcc.AvatarRemoteURL != "" && latestAcc.AvatarRemoteURL != account.AvatarRemoteURL { // Account header URL has changed; fetch up-to-date copy and use new media ID.
d.dereferencingHeadersLock.Lock() latestAcc.HeaderMediaAttachmentID, err = d.fetchRemoteAccountHeader(ctx,
newHeaderID, err := d.fetchRemoteAccountMedia(ctx, transport, latestAcc.HeaderRemoteURL, latestAcc.ID, d.dereferencingHeaders, false, true) transport,
d.dereferencingHeadersLock.Unlock() latestAcc.HeaderRemoteURL,
latestAcc.ID,
)
if err != nil { if err != nil {
log.Errorf("error fetching remote header for account %s: %v", uri, err) log.Errorf("error fetching remote header for account %s: %v", uri, err)
} else {
latestAcc.HeaderMediaAttachmentID = newHeaderID
} }
} else {
latestAcc.HeaderMediaAttachmentID = account.HeaderMediaAttachmentID // no change / empty url
} }
// Fetch the latest remote account emoji IDs used in account display name/bio. // Fetch the latest remote account emoji IDs used in account display name/bio.
@ -338,47 +334,106 @@ func (d *deref) dereferenceAccountable(ctx context.Context, transport transport.
return nil, newErrWrongType(fmt.Errorf("DereferenceAccountable: type name %s not supported as Accountable", t.GetTypeName())) return nil, newErrWrongType(fmt.Errorf("DereferenceAccountable: type name %s not supported as Accountable", t.GetTypeName()))
} }
func (d *deref) fetchRemoteAccountMedia( func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.Transport, avatarURL string, accountID string) (string, error) {
ctx context.Context, // Parse and validate provided media URL.
transport transport.Transport, avatarURI, err := url.Parse(avatarURL)
mediaRemoteURL string,
targetAccountID string,
dereferencingMap map[string]*media.ProcessingMedia,
avatar bool,
header bool,
) (string, error) {
// first check if we're already processing this media
if alreadyProcessing, ok := dereferencingMap[targetAccountID]; ok {
// we're already on it, nothing else to do
return alreadyProcessing.AttachmentID(), nil
}
avatarIRI, err := url.Parse(mediaRemoteURL)
if err != nil { if err != nil {
return "", err return "", err
} }
data := func(innerCtx context.Context) (io.ReadCloser, int64, error) { // Acquire lock for derefs map.
return transport.DereferenceMedia(innerCtx, avatarIRI) unlock := d.derefAvatarsMu.Lock()
defer unlock()
if processing, ok := d.derefAvatars[avatarURL]; ok {
// we're already dereferencing it, nothing to do.
return processing.AttachmentID(), nil
} }
processingMedia, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccountID, &media.AdditionalMediaInfo{ // Set the media data function to dereference avatar from URI.
RemoteURL: &mediaRemoteURL, data := func(ctx context.Context) (io.ReadCloser, int64, error) {
Avatar: &avatar, return tsport.DereferenceMedia(ctx, avatarURI)
Header: &header, }
// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := false; return &v }(),
RemoteURL: &avatarURL,
}) })
if err != nil { if err != nil {
return "", err return "", err
} }
// store it in our map to indicate it's in process // Store media in map to mark as processing.
dereferencingMap[targetAccountID] = processingMedia d.derefAvatars[avatarURL] = processing
defer delete(dereferencingMap, targetAccountID)
if _, err := processingMedia.LoadAttachment(ctx); err != nil { // Unlock map.
unlock()
defer func() {
// On exit safely remove media from map.
unlock := d.derefAvatarsMu.Lock()
delete(d.derefAvatars, avatarURL)
unlock()
}()
// Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil {
return "", err return "", err
} }
return processingMedia.AttachmentID(), nil return processing.AttachmentID(), nil
}
func (d *deref) fetchRemoteAccountHeader(ctx context.Context, tsport transport.Transport, headerURL string, accountID string) (string, error) {
// Parse and validate provided media URL.
headerURI, err := url.Parse(headerURL)
if err != nil {
return "", err
}
// Acquire lock for derefs map.
unlock := d.derefHeadersMu.Lock()
defer unlock()
if processing, ok := d.derefHeaders[headerURL]; ok {
// we're already dereferencing it, nothing to do.
return processing.AttachmentID(), nil
}
// Set the media data function to dereference header from URI.
data := func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, headerURI)
}
// Create new media processing request from the media manager instance.
processing, err := d.mediaManager.ProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &headerURL,
})
if err != nil {
return "", err
}
// Store media in map to mark as processing.
d.derefHeaders[headerURL] = processing
// Unlock map.
unlock()
defer func() {
// On exit safely remove media from map.
unlock := d.derefHeadersMu.Lock()
delete(d.derefHeaders, headerURL)
unlock()
}()
// Start media attachment loading (blocking call).
if _, err := processing.LoadAttachment(ctx); err != nil {
return "", err
}
return processing.AttachmentID(), nil
} }
func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gtsmodel.Account, requestingUsername string) (bool, error) { func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gtsmodel.Account, requestingUsername string) (bool, error) {

View file

@ -23,6 +23,7 @@
"net/url" "net/url"
"sync" "sync"
"codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
@ -58,30 +59,36 @@ type Dereferencer interface {
} }
type deref struct { type deref struct {
db db.DB db db.DB
typeConverter typeutils.TypeConverter typeConverter typeutils.TypeConverter
transportController transport.Controller transportController transport.Controller
mediaManager media.Manager mediaManager media.Manager
dereferencingAvatars map[string]*media.ProcessingMedia derefAvatars map[string]*media.ProcessingMedia
dereferencingAvatarsLock sync.Mutex derefAvatarsMu mutexes.Mutex
dereferencingHeaders map[string]*media.ProcessingMedia derefHeaders map[string]*media.ProcessingMedia
dereferencingHeadersLock sync.Mutex derefHeadersMu mutexes.Mutex
dereferencingEmojis map[string]*media.ProcessingEmoji derefEmojis map[string]*media.ProcessingEmoji
dereferencingEmojisLock sync.Mutex derefEmojisMu mutexes.Mutex
handshakes map[string][]*url.URL handshakes map[string][]*url.URL
handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map handshakeSync sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map
} }
// NewDereferencer returns a Dereferencer initialized with the given parameters. // NewDereferencer returns a Dereferencer initialized with the given parameters.
func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer { func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer {
return &deref{ return &deref{
db: db, db: db,
typeConverter: typeConverter, typeConverter: typeConverter,
transportController: transportController, transportController: transportController,
mediaManager: mediaManager, mediaManager: mediaManager,
dereferencingAvatars: make(map[string]*media.ProcessingMedia), derefAvatars: make(map[string]*media.ProcessingMedia),
dereferencingHeaders: make(map[string]*media.ProcessingMedia), derefHeaders: make(map[string]*media.ProcessingMedia),
dereferencingEmojis: make(map[string]*media.ProcessingEmoji), derefEmojis: make(map[string]*media.ProcessingEmoji),
handshakes: make(map[string][]*url.URL), handshakes: make(map[string][]*url.URL),
// use wrapped mutexes to allow safely deferring unlock
// even when more granular locks are required (only unlocks once).
derefAvatarsMu: mutexes.WithSafety(mutexes.New()),
derefHeadersMu: mutexes.WithSafety(mutexes.New()),
derefEmojisMu: mutexes.WithSafety(mutexes.New()),
} }
} }

View file

@ -37,23 +37,23 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r
processingEmoji *media.ProcessingEmoji processingEmoji *media.ProcessingEmoji
) )
d.dereferencingEmojisLock.Lock() // LOCK HERE // Acquire lock for derefs map.
unlock := d.derefEmojisMu.Lock()
defer unlock()
// first check if we're already processing this emoji // first check if we're already processing this emoji
if alreadyProcessing, ok := d.dereferencingEmojis[shortcodeDomain]; ok { if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok {
// we're already on it, no worries // we're already on it, no worries
processingEmoji = alreadyProcessing processingEmoji = alreadyProcessing
} else { } else {
// not processing it yet, let's start // not processing it yet, let's start
t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername)
if err != nil { if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err)
} }
derefURI, err := url.Parse(remoteURL) derefURI, err := url.Parse(remoteURL)
if err != nil { if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)
} }
@ -63,29 +63,26 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r
newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh) newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh)
if err != nil { if err != nil {
d.dereferencingEmojisLock.Unlock()
return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err)
} }
// store it in our map to indicate it's in process // store it in our map to indicate it's in process
d.dereferencingEmojis[shortcodeDomain] = newProcessing d.derefEmojis[shortcodeDomain] = newProcessing
processingEmoji = newProcessing processingEmoji = newProcessing
} }
d.dereferencingEmojisLock.Unlock() // Unlock map.
unlock()
load := func(innerCtx context.Context) error { defer func() {
_, err := processingEmoji.LoadEmoji(innerCtx) // On exit safely remove emoji from map.
return err unlock := d.derefEmojisMu.Lock()
} delete(d.derefEmojis, shortcodeDomain)
unlock()
}()
cleanup := func() { // Start emoji attachment loading (blocking call).
d.dereferencingEmojisLock.Lock() if _, err := processingEmoji.LoadEmoji(ctx); err != nil {
delete(d.dereferencingHeaders, shortcodeDomain)
d.dereferencingEmojisLock.Unlock()
}
if err := loadAndCleanup(ctx, load, cleanup); err != nil {
return nil, err return nil, err
} }

View file

@ -1,31 +0,0 @@
/*
GoToSocial
Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package dereferencing
import (
"context"
)
func loadAndCleanup(ctx context.Context, load func(ctx context.Context) error, cleanup func()) error {
// whatever happens, clean up when we're done
defer cleanup()
// try and load
return load(ctx)
}