diff --git a/internal/config/helpers.gen.go b/internal/config/helpers.gen.go index 4b46835fb..1947fdadf 100644 --- a/internal/config/helpers.gen.go +++ b/internal/config/helpers.gen.go @@ -1869,4 +1869,3 @@ func GetAdvancedRateLimitRequests() int { return global.GetAdvancedRateLimitRequ // SetAdvancedRateLimitRequests safely sets the value for global configuration 'AdvancedRateLimitRequests' field func SetAdvancedRateLimitRequests(v int) { global.SetAdvancedRateLimitRequests(v) } - diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 21900d47b..0e7bc1cc9 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -26,7 +26,6 @@ "io" "net/url" "strings" - "sync" "time" "github.com/miekg/dns" @@ -478,9 +477,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm if alreadyProcessing, ok := d.dereferencingAvatars[targetAccount.ID]; ok { // we're already on it, no worries processingMedia = alreadyProcessing - } - - if processingMedia == nil { + } else { // we're not already processing it so start now avatarIRI, err := url.Parse(targetAccount.AvatarRemoteURL) if err != nil { @@ -492,6 +489,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm var err error t, err = d.transportController.NewTransportForUsername(ctx, requestingUsername) if err != nil { + d.dereferencingAvatarsLock.Unlock() return false, fmt.Errorf("fetchRemoteAccountMedia: error getting transport for user: %s", err) } } @@ -516,16 +514,27 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm } d.dereferencingAvatarsLock.Unlock() // UNLOCK HERE + load := func(innerCtx context.Context) error { + _, err := processingMedia.LoadAttachment(innerCtx) + return err + } + + cleanup := func() { + d.dereferencingAvatarsLock.Lock() + delete(d.dereferencingAvatars, targetAccount.ID) + d.dereferencingAvatarsLock.Unlock() + } + // block until loaded if required... if blocking { - if err := lockAndLoad(ctx, d.dereferencingAvatarsLock, processingMedia, d.dereferencingAvatars, targetAccount.ID); err != nil { + if err := loadAndCleanup(ctx, load, cleanup); err != nil { return changed, err } } else { // ...otherwise do it async go func() { dlCtx, done := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute)) - if err := lockAndLoad(dlCtx, d.dereferencingAvatarsLock, processingMedia, d.dereferencingAvatars, targetAccount.ID); err != nil { + if err := loadAndCleanup(dlCtx, load, cleanup); err != nil { log.Errorf("fetchRemoteAccountMedia: error during async lock and load of avatar: %s", err) } done() @@ -544,9 +553,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm if alreadyProcessing, ok := d.dereferencingHeaders[targetAccount.ID]; ok { // we're already on it, no worries processingMedia = alreadyProcessing - } - - if processingMedia == nil { + } else { // we're not already processing it so start now headerIRI, err := url.Parse(targetAccount.HeaderRemoteURL) if err != nil { @@ -558,6 +565,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm var err error t, err = d.transportController.NewTransportForUsername(ctx, requestingUsername) if err != nil { + d.dereferencingAvatarsLock.Unlock() return false, fmt.Errorf("fetchRemoteAccountMedia: error getting transport for user: %s", err) } } @@ -582,16 +590,27 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm } d.dereferencingHeadersLock.Unlock() // UNLOCK HERE + load := func(innerCtx context.Context) error { + _, err := processingMedia.LoadAttachment(innerCtx) + return err + } + + cleanup := func() { + d.dereferencingHeadersLock.Lock() + delete(d.dereferencingHeaders, targetAccount.ID) + d.dereferencingHeadersLock.Unlock() + } + // block until loaded if required... if blocking { - if err := lockAndLoad(ctx, d.dereferencingHeadersLock, processingMedia, d.dereferencingHeaders, targetAccount.ID); err != nil { + if err := loadAndCleanup(ctx, load, cleanup); err != nil { return changed, err } } else { // ...otherwise do it async go func() { dlCtx, done := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute)) - if err := lockAndLoad(dlCtx, d.dereferencingHeadersLock, processingMedia, d.dereferencingHeaders, targetAccount.ID); err != nil { + if err := loadAndCleanup(dlCtx, load, cleanup); err != nil { log.Errorf("fetchRemoteAccountMedia: error during async lock and load of header: %s", err) } done() @@ -615,7 +634,7 @@ func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gts // If we only have IDs, fetch the emojis from the db. We know they're in // there or else they wouldn't have IDs. if len(maybeEmojiIDs) > len(maybeEmojis) { - maybeEmojis = []*gtsmodel.Emoji{} + maybeEmojis = make([]*gtsmodel.Emoji, 0, len(maybeEmojiIDs)) for _, emojiID := range maybeEmojiIDs { maybeEmoji, err := d.db.GetEmojiByID(ctx, emojiID) if err != nil { @@ -716,16 +735,3 @@ func (d *deref) fetchRemoteAccountEmojis(ctx context.Context, targetAccount *gts return changed, nil } - -func lockAndLoad(ctx context.Context, lock *sync.Mutex, processing *media.ProcessingMedia, processingMap map[string]*media.ProcessingMedia, accountID string) error { - // whatever happens, remove the in-process media from the map - defer func() { - lock.Lock() - delete(processingMap, accountID) - lock.Unlock() - }() - - // try and load it - _, err := processing.LoadAttachment(ctx) - return err -} diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index a6cb9b15f..f043e7cbd 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -41,7 +41,7 @@ type Dereferencer interface { GetRemoteInstance(ctx context.Context, username string, remoteInstanceURI *url.URL) (*gtsmodel.Instance, error) GetRemoteMedia(ctx context.Context, requestingUsername string, accountID string, remoteURL string, ai *media.AdditionalMediaInfo) (*media.ProcessingMedia, error) - GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) + GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) DereferenceAnnounce(ctx context.Context, announce *gtsmodel.Status, requestingUsername string) error DereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) @@ -58,6 +58,8 @@ type deref struct { dereferencingAvatarsLock *sync.Mutex dereferencingHeaders map[string]*media.ProcessingMedia dereferencingHeadersLock *sync.Mutex + dereferencingEmojis map[string]*media.ProcessingEmoji + dereferencingEmojisLock *sync.Mutex handshakes map[string][]*url.URL handshakeSync *sync.Mutex // mutex to lock/unlock when checking or updating the handshakes map } @@ -73,6 +75,8 @@ func NewDereferencer(db db.DB, typeConverter typeutils.TypeConverter, transportC dereferencingAvatarsLock: &sync.Mutex{}, dereferencingHeaders: make(map[string]*media.ProcessingMedia), dereferencingHeadersLock: &sync.Mutex{}, + dereferencingEmojis: make(map[string]*media.ProcessingEmoji), + dereferencingEmojisLock: &sync.Mutex{}, handshakeSync: &sync.Mutex{}, } } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 2d32c8803..2ac7fc03d 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -31,27 +31,65 @@ "github.com/superseriousbusiness/gotosocial/internal/media" ) -func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { - t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) - if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error creating transport: %s", err) +func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { + var ( + shortcodeDomain = shortcode + "@" + domain + processingEmoji *media.ProcessingEmoji + ) + + d.dereferencingEmojisLock.Lock() // LOCK HERE + + // first check if we're already processing this emoji + if alreadyProcessing, ok := d.dereferencingEmojis[shortcodeDomain]; ok { + // we're already on it, no worries + processingEmoji = alreadyProcessing + } else { + // not processing it yet, let's start + t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) + if err != nil { + d.dereferencingEmojisLock.Unlock() + return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) + } + + derefURI, err := url.Parse(remoteURL) + if err != nil { + d.dereferencingEmojisLock.Unlock() + return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err) + } + + dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { + return t.DereferenceMedia(innerCtx, derefURI) + } + + newProcessing, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh) + if err != nil { + d.dereferencingEmojisLock.Unlock() + return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) + } + + // store it in our map to indicate it's in process + d.dereferencingEmojis[shortcodeDomain] = newProcessing + processingEmoji = newProcessing } - derefURI, err := url.Parse(remoteURL) - if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error parsing url: %s", err) + d.dereferencingEmojisLock.Unlock() + + load := func(innerCtx context.Context) error { + _, err := processingEmoji.LoadEmoji(innerCtx) + return err } - dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { - return t.DereferenceMedia(innerCtx, derefURI) + cleanup := func() { + d.dereferencingEmojisLock.Lock() + delete(d.dereferencingHeaders, shortcodeDomain) + d.dereferencingEmojisLock.Unlock() } - processingMedia, err := d.mediaManager.ProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh) - if err != nil { - return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji: %s", err) + if err := loadAndCleanup(ctx, load, cleanup); err != nil { + return nil, err } - return processingMedia, nil + return processingEmoji, nil } func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, requestingUsername string) ([]*gtsmodel.Emoji, error) { @@ -67,32 +105,58 @@ func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, for _, e := range rawEmojis { var gotEmoji *gtsmodel.Emoji var err error + shortcodeDomain := e.Shortcode + "@" + e.Domain - // check if we've already got this emoji in the db - if gotEmoji, err = d.db.GetEmojiByShortcodeDomain(ctx, e.Shortcode, e.Domain); err != nil && err != db.ErrNoEntries { - log.Errorf("populateEmojis: error checking database for emoji %s: %s", e.URI, err) + // check if we already know this emoji + if e.ID != "" { + // we had an ID for this emoji already, which means + // it should be fleshed out already and we won't + // have to get it from the database again + gotEmoji = e + } else if gotEmoji, err = d.db.GetEmojiByShortcodeDomain(ctx, e.Shortcode, e.Domain); err != nil && err != db.ErrNoEntries { + log.Errorf("populateEmojis: error checking database for emoji %s: %s", shortcodeDomain, err) continue } + var refresh bool + if gotEmoji != nil { - // we had the emoji in our database already; make sure the one we have is up to date - if (e.UpdatedAt.After(gotEmoji.ImageUpdatedAt)) || (e.URI != gotEmoji.URI) || (e.ImageRemoteURL != gotEmoji.ImageRemoteURL) { + // we had the emoji already, but refresh it if necessary + if e.UpdatedAt.Unix() > gotEmoji.ImageUpdatedAt.Unix() { + log.Tracef("populateEmojis: emoji %s was updated since we last saw it, will refresh", shortcodeDomain) + refresh = true + } + + if !refresh && (e.URI != gotEmoji.URI) { + log.Tracef("populateEmojis: emoji %s changed URI since we last saw it, will refresh", shortcodeDomain) + refresh = true + } + + if !refresh && (e.ImageRemoteURL != gotEmoji.ImageRemoteURL) { + log.Tracef("populateEmojis: emoji %s changed image URL since we last saw it, will refresh", shortcodeDomain) + refresh = true + } + + if !refresh { + log.Tracef("populateEmojis: emoji %s is up to date, will not refresh", shortcodeDomain) + } else { + log.Tracef("populateEmojis: refreshing emoji %s", shortcodeDomain) emojiID := gotEmoji.ID // use existing ID - processingEmoji, err := d.GetRemoteEmoji(ctx, requestingUsername, e.ImageRemoteURL, e.Shortcode, emojiID, e.URI, &media.AdditionalEmojiInfo{ + processingEmoji, err := d.GetRemoteEmoji(ctx, requestingUsername, e.ImageRemoteURL, e.Shortcode, e.Domain, emojiID, e.URI, &media.AdditionalEmojiInfo{ Domain: &e.Domain, ImageRemoteURL: &e.ImageRemoteURL, - ImageStaticRemoteURL: &e.ImageRemoteURL, + ImageStaticRemoteURL: &e.ImageStaticRemoteURL, Disabled: gotEmoji.Disabled, VisibleInPicker: gotEmoji.VisibleInPicker, - }, true) + }, refresh) if err != nil { - log.Errorf("populateEmojis: couldn't refresh remote emoji %s: %s", e.URI, err) + log.Errorf("populateEmojis: couldn't refresh remote emoji %s: %s", shortcodeDomain, err) continue } if gotEmoji, err = processingEmoji.LoadEmoji(ctx); err != nil { - log.Errorf("populateEmojis: couldn't load refreshed remote emoji %s: %s", e.URI, err) + log.Errorf("populateEmojis: couldn't load refreshed remote emoji %s: %s", shortcodeDomain, err) continue } } @@ -100,25 +164,25 @@ func (d *deref) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, // it's new! go get it! newEmojiID, err := id.NewRandomULID() if err != nil { - log.Errorf("populateEmojis: error generating id for remote emoji %s: %s", e.URI, err) + log.Errorf("populateEmojis: error generating id for remote emoji %s: %s", shortcodeDomain, err) continue } - processingEmoji, err := d.GetRemoteEmoji(ctx, requestingUsername, e.ImageRemoteURL, e.Shortcode, newEmojiID, e.URI, &media.AdditionalEmojiInfo{ + processingEmoji, err := d.GetRemoteEmoji(ctx, requestingUsername, e.ImageRemoteURL, e.Shortcode, e.Domain, newEmojiID, e.URI, &media.AdditionalEmojiInfo{ Domain: &e.Domain, ImageRemoteURL: &e.ImageRemoteURL, - ImageStaticRemoteURL: &e.ImageRemoteURL, + ImageStaticRemoteURL: &e.ImageStaticRemoteURL, Disabled: e.Disabled, VisibleInPicker: e.VisibleInPicker, - }, false) + }, refresh) if err != nil { - log.Errorf("populateEmojis: couldn't get remote emoji %s: %s", e.URI, err) + log.Errorf("populateEmojis: couldn't get remote emoji %s: %s", shortcodeDomain, err) continue } if gotEmoji, err = processingEmoji.LoadEmoji(ctx); err != nil { - log.Errorf("populateEmojis: couldn't load remote emoji %s: %s", e.URI, err) + log.Errorf("populateEmojis: couldn't load remote emoji %s: %s", shortcodeDomain, err) continue } } diff --git a/internal/federation/dereferencing/emoji_test.go b/internal/federation/dereferencing/emoji_test.go index 3093a1e7f..af3cb3318 100644 --- a/internal/federation/dereferencing/emoji_test.go +++ b/internal/federation/dereferencing/emoji_test.go @@ -51,7 +51,7 @@ func (suite *EmojiTestSuite) TestDereferenceEmojiBlocking() { VisibleInPicker: &emojiVisibleInPicker, } - processingEmoji, err := suite.dereferencer.GetRemoteEmoji(ctx, fetchingAccount.Username, emojiImageRemoteURL, emojiShortcode, emojiID, emojiURI, ai, false) + processingEmoji, err := suite.dereferencer.GetRemoteEmoji(ctx, fetchingAccount.Username, emojiImageRemoteURL, emojiShortcode, emojiDomain, emojiID, emojiURI, ai, false) suite.NoError(err) // make a blocking call to load the emoji from the in-process media diff --git a/internal/federation/dereferencing/util.go b/internal/federation/dereferencing/util.go new file mode 100644 index 000000000..73bc1f29d --- /dev/null +++ b/internal/federation/dereferencing/util.go @@ -0,0 +1,31 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +package dereferencing + +import ( + "context" +) + +func loadAndCleanup(ctx context.Context, load func(ctx context.Context) error, cleanup func()) error { + // whatever happens, clean up when we're done + defer cleanup() + + // try and load + return load(ctx) +} diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index ec46ae76d..2ae403931 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -206,6 +206,15 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { } }() + // execute the postData function no matter what happens + defer func() { + if p.postData != nil { + if err := p.postData(ctx); err != nil { + log.Errorf("store: error executing postData: %s", err) + } + } + }() + // extract no more than 261 bytes from the beginning of the file -- this is the header firstBytes := make([]byte, maxFileHeaderBytes) if _, err := rc.Read(firstBytes); err != nil { @@ -259,27 +268,26 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { } // store this for now -- other processes can pull it out of storage as they please - if fileSize, err = putStream(ctx, p.storage, p.emoji.ImagePath, readerToStore, fileSize); err != nil && err != storage.ErrAlreadyExists { - return fmt.Errorf("store: error storing stream: %s", err) + if fileSize, err = putStream(ctx, p.storage, p.emoji.ImagePath, readerToStore, fileSize); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + return fmt.Errorf("store: error storing stream: %s", err) + } + log.Warnf("emoji %s already exists at storage path: %s", p.emoji.ID, p.emoji.ImagePath) } // if we didn't know the fileSize yet, we do now, so check if we need to if !checkedSize && fileSize > maxEmojiSize { - defer func() { - if err := p.storage.Delete(ctx, p.emoji.ImagePath); err != nil { - log.Errorf("store: error removing too-large emoji from the store: %s", err) - } - }() - return fmt.Errorf("store: discovered emoji fileSize (%db) is larger than allowed emojiRemoteMaxSize (%db)", fileSize, maxEmojiSize) + err = fmt.Errorf("store: discovered emoji fileSize (%db) is larger than allowed emojiRemoteMaxSize (%db), will delete from the store now", fileSize, maxEmojiSize) + log.Warn(err) + if deleteErr := p.storage.Delete(ctx, p.emoji.ImagePath); deleteErr != nil { + log.Errorf("store: error removing too-large emoji from the store: %s", deleteErr) + } + return err } p.emoji.ImageFileSize = int(fileSize) p.read = true - if p.postData != nil { - return p.postData(ctx) - } - return nil } @@ -303,20 +311,20 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData P originalPostData := postData originalImagePath := emoji.ImagePath originalImageStaticPath := emoji.ImageStaticPath - postData = func(ctx context.Context) error { + postData = func(innerCtx context.Context) error { // trigger the original postData function if it was provided if originalPostData != nil { - if err := originalPostData(ctx); err != nil { + if err := originalPostData(innerCtx); err != nil { return err } } l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain) l.Debug("postData: cleaning up old emoji files for refreshed emoji") - if err := m.storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, gostore.ErrNotFound) { + if err := m.storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, gostore.ErrNotFound) { l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err) } - if err := m.storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, gostore.ErrNotFound) { + if err := m.storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, gostore.ErrNotFound) { l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err) } diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 1247586cb..c724de849 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -21,6 +21,7 @@ import ( "bytes" "context" + "errors" "fmt" "io" "strings" @@ -349,8 +350,11 @@ func (p *ProcessingMedia) store(ctx context.Context) error { p.attachment.File.Path = fmt.Sprintf("%s/%s/%s/%s.%s", p.attachment.AccountID, TypeAttachment, SizeOriginal, p.attachment.ID, extension) // store this for now -- other processes can pull it out of storage as they please - if fileSize, err = putStream(ctx, p.storage, p.attachment.File.Path, readerToStore, fileSize); err != nil && err != storage.ErrAlreadyExists { - return fmt.Errorf("store: error storing stream: %s", err) + if fileSize, err = putStream(ctx, p.storage, p.attachment.File.Path, readerToStore, fileSize); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + return fmt.Errorf("store: error storing stream: %s", err) + } + log.Warnf("attachment %s already exists at storage path: %s", p.attachment.ID, p.attachment.File.Path) } cached := true