mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-11-25 13:16:40 +00:00
355 lines
11 KiB
Go
355 lines
11 KiB
Go
|
// GoToSocial
|
||
|
// Copyright (C) GoToSocial Authors admin@gotosocial.org
|
||
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||
|
//
|
||
|
// This program is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Affero General Public License as published by
|
||
|
// the Free Software Foundation, either version 3 of the License, or
|
||
|
// (at your option) any later version.
|
||
|
//
|
||
|
// This program is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Affero General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Affero General Public License
|
||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
package bundb
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"database/sql"
|
||
|
"time"
|
||
|
|
||
|
"github.com/superseriousbusiness/gotosocial/internal/db"
|
||
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||
|
"github.com/uptrace/bun"
|
||
|
"github.com/uptrace/bun/dialect"
|
||
|
"github.com/uptrace/bun/schema"
|
||
|
)
|
||
|
|
||
|
// DB wraps a bun database instance
|
||
|
// to provide common per-dialect SQL error
|
||
|
// conversions to common types, and retries
|
||
|
// on returned busy (SQLite only).
|
||
|
type DB struct {
|
||
|
// our own wrapped db type
|
||
|
// with retry backoff support.
|
||
|
// kept separate to the *bun.DB
|
||
|
// type to be passed into query
|
||
|
// builders as bun.IConn iface
|
||
|
// (this prevents double firing
|
||
|
// bun query hooks).
|
||
|
//
|
||
|
// also holds per-dialect
|
||
|
// error hook function.
|
||
|
raw rawdb
|
||
|
|
||
|
// bun DB interface we use
|
||
|
// for dialects, and improved
|
||
|
// struct marshal/unmarshaling.
|
||
|
bun *bun.DB
|
||
|
}
|
||
|
|
||
|
// WrapDB wraps a bun database instance in our database type.
|
||
|
func WrapDB(db *bun.DB) *DB {
|
||
|
var errProc func(error) error
|
||
|
switch name := db.Dialect().Name(); name {
|
||
|
case dialect.PG:
|
||
|
errProc = processPostgresError
|
||
|
case dialect.SQLite:
|
||
|
errProc = processSQLiteError
|
||
|
default:
|
||
|
panic("unknown dialect name: " + name.String())
|
||
|
}
|
||
|
return &DB{
|
||
|
raw: rawdb{
|
||
|
errHook: errProc,
|
||
|
DB: db.DB,
|
||
|
},
|
||
|
bun: db,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Dialect is a direct call-through to bun.DB.Dialect().
|
||
|
func (db *DB) Dialect() schema.Dialect { return db.bun.Dialect() }
|
||
|
|
||
|
// AddQueryHook is a direct call-through to bun.DB.AddQueryHook().
|
||
|
func (db *DB) AddQueryHook(hook bun.QueryHook) { db.bun.AddQueryHook(hook) }
|
||
|
|
||
|
// RegisterModels is a direct call-through to bun.DB.RegisterModels().
|
||
|
func (db *DB) RegisterModel(models ...any) { db.bun.RegisterModel(models...) }
|
||
|
|
||
|
// PingContext is a direct call-through to bun.DB.PingContext().
|
||
|
func (db *DB) PingContext(ctx context.Context) error { return db.bun.PingContext(ctx) }
|
||
|
|
||
|
// Close is a direct call-through to bun.DB.Close().
|
||
|
func (db *DB) Close() error { return db.bun.Close() }
|
||
|
|
||
|
// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout.
|
||
|
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) {
|
||
|
bundb := db.bun // use *bun.DB interface to return bun.Tx type
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
tx, err = bundb.BeginTx(ctx, opts)
|
||
|
err = db.raw.errHook(err)
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
||
|
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
|
||
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
result, err = bundb.ExecContext(ctx, query, args...)
|
||
|
err = db.raw.errHook(err)
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
||
|
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
|
||
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
rows, err = bundb.QueryContext(ctx, query, args...)
|
||
|
err = db.raw.errHook(err)
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
||
|
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
|
||
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
||
|
_ = retryOnBusy(ctx, func() error {
|
||
|
row = bundb.QueryRowContext(ctx, query, args...)
|
||
|
err := db.raw.errHook(row.Err())
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts.
|
||
|
func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error {
|
||
|
// Attempt to start new transaction.
|
||
|
tx, err := db.BeginTx(ctx, nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
var done bool
|
||
|
|
||
|
defer func() {
|
||
|
if !done {
|
||
|
// Rollback (with retry-backoff).
|
||
|
_ = retryOnBusy(ctx, func() error {
|
||
|
err := tx.Rollback()
|
||
|
return db.raw.errHook(err)
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Perform supplied transaction
|
||
|
if err := fn(tx); err != nil {
|
||
|
return db.raw.errHook(err)
|
||
|
}
|
||
|
|
||
|
// Commit (with retry-backoff).
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
err := tx.Commit()
|
||
|
return db.raw.errHook(err)
|
||
|
})
|
||
|
done = true
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewValues(model interface{}) *bun.ValuesQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewValuesQuery(db.bun, model).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewMerge() *bun.MergeQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewMergeQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewSelect() *bun.SelectQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewSelectQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewInsert() *bun.InsertQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewInsertQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewUpdate() *bun.UpdateQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewUpdateQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewDelete() *bun.DeleteQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewDeleteQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewRaw(query string, args ...interface{}) *bun.RawQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewRawQuery(db.bun, query, args...).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewCreateTable() *bun.CreateTableQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewCreateTableQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewDropTable() *bun.DropTableQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewDropTableQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewCreateIndex() *bun.CreateIndexQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewCreateIndexQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewDropIndex() *bun.DropIndexQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewDropIndexQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewTruncateTable() *bun.TruncateTableQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewTruncateTableQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewAddColumn() *bun.AddColumnQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewAddColumnQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
func (db *DB) NewDropColumn() *bun.DropColumnQuery {
|
||
|
// note: passing in rawdb as conn iface so no double query-hook
|
||
|
// firing when passed through the bun.DB.Query___() functions.
|
||
|
return bun.NewDropColumnQuery(db.bun).Conn(&db.raw)
|
||
|
}
|
||
|
|
||
|
// Exists checks the results of a SelectQuery for the existence of the data in question, masking ErrNoEntries errors.
|
||
|
func (db *DB) Exists(ctx context.Context, query *bun.SelectQuery) (bool, error) {
|
||
|
exists, err := query.Exists(ctx)
|
||
|
switch err {
|
||
|
case nil:
|
||
|
return exists, nil
|
||
|
case sql.ErrNoRows:
|
||
|
return false, nil
|
||
|
default:
|
||
|
return false, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NotExists checks the results of a SelectQuery for the non-existence of the data in question, masking ErrNoEntries errors.
|
||
|
func (db *DB) NotExists(ctx context.Context, query *bun.SelectQuery) (bool, error) {
|
||
|
exists, err := db.Exists(ctx, query)
|
||
|
return !exists, err
|
||
|
}
|
||
|
|
||
|
type rawdb struct {
|
||
|
// dialect specific error
|
||
|
// processing function hook.
|
||
|
errHook func(error) error
|
||
|
|
||
|
// embedded raw
|
||
|
// db interface
|
||
|
*sql.DB
|
||
|
}
|
||
|
|
||
|
// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout.
|
||
|
func (db *rawdb) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
result, err = db.DB.ExecContext(ctx, query, args...)
|
||
|
err = db.errHook(err)
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout.
|
||
|
func (db *rawdb) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
|
||
|
err = retryOnBusy(ctx, func() error {
|
||
|
rows, err = db.DB.QueryContext(ctx, query, args...)
|
||
|
err = db.errHook(err)
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout.
|
||
|
func (db *rawdb) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
|
||
|
_ = retryOnBusy(ctx, func() error {
|
||
|
row = db.DB.QueryRowContext(ctx, query, args...)
|
||
|
err := db.errHook(row.Err())
|
||
|
return err
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// retryOnBusy will retry given function on returned 'errBusy'.
|
||
|
func retryOnBusy(ctx context.Context, fn func() error) error {
|
||
|
var backoff time.Duration
|
||
|
|
||
|
for i := 0; ; i++ {
|
||
|
// Perform func.
|
||
|
err := fn()
|
||
|
|
||
|
if err != errBusy {
|
||
|
// May be nil, or may be
|
||
|
// some other error, either
|
||
|
// way return here.
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// backoff according to a multiplier of 2ms * 2^2n,
|
||
|
// up to a maximum possible backoff time of 5 minutes.
|
||
|
//
|
||
|
// this works out as the following:
|
||
|
// 4ms
|
||
|
// 16ms
|
||
|
// 64ms
|
||
|
// 256ms
|
||
|
// 1.024s
|
||
|
// 4.096s
|
||
|
// 16.384s
|
||
|
// 1m5.536s
|
||
|
// 4m22.144s
|
||
|
backoff = 2 * time.Millisecond * (1 << (2*i + 1))
|
||
|
if backoff >= 5*time.Minute {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
// Context cancelled.
|
||
|
case <-ctx.Done():
|
||
|
|
||
|
// Backoff for some time.
|
||
|
case <-time.After(backoff):
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return gtserror.Newf("%w (waited > %s)", db.ErrBusyTimeout, backoff)
|
||
|
}
|