From 1ee99fc16583e4cefc46011dbc63ca8ed91d3189 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Thu, 31 Aug 2023 10:46:15 +0100 Subject: [PATCH] [bugfix] wrap bun.Tx to add our own error processing (#2169) * wrap bun.Tx to add our own error processing Signed-off-by: kim * add compile-time check for updateRowError() compatibility with sql.Row, fix wrapTx() not being used properly Signed-off-by: kim --------- Signed-off-by: kim --- internal/db/bundb/account.go | 6 +- internal/db/bundb/db.go | 296 ++++++++++++++++++++++++++++++----- internal/db/bundb/emoji.go | 2 +- internal/db/bundb/list.go | 4 +- internal/db/bundb/marker.go | 2 +- internal/db/bundb/media.go | 2 +- internal/db/bundb/status.go | 6 +- 7 files changed, 271 insertions(+), 47 deletions(-) diff --git a/internal/db/bundb/account.go b/internal/db/bundb/account.go index c88edebbf..43e5055e1 100644 --- a/internal/db/bundb/account.go +++ b/internal/db/bundb/account.go @@ -298,7 +298,7 @@ func (a *accountDB) PutAccount(ctx context.Context, account *gtsmodel.Account) e // It is safe to run this database transaction within cache.Store // as the cache does not attempt a mutex lock until AFTER hook. // - return a.db.RunInTx(ctx, func(tx bun.Tx) error { + return a.db.RunInTx(ctx, func(tx Tx) error { // create links between this account and any emojis it uses for _, i := range account.EmojiIDs { if _, err := tx.NewInsert().Model(>smodel.AccountToEmoji{ @@ -327,7 +327,7 @@ func (a *accountDB) UpdateAccount(ctx context.Context, account *gtsmodel.Account // It is safe to run this database transaction within cache.Store // as the cache does not attempt a mutex lock until AFTER hook. // - return a.db.RunInTx(ctx, func(tx bun.Tx) error { + return a.db.RunInTx(ctx, func(tx Tx) error { // create links between this account and any emojis it uses // first clear out any old emoji links if _, err := tx. @@ -375,7 +375,7 @@ func (a *accountDB) DeleteAccount(ctx context.Context, id string) error { return err } - return a.db.RunInTx(ctx, func(tx bun.Tx) error { + return a.db.RunInTx(ctx, func(tx Tx) error { // clear out any emoji links if _, err := tx. NewDelete(). diff --git a/internal/db/bundb/db.go b/internal/db/bundb/db.go index 9b6edcefe..2b19ba0c4 100644 --- a/internal/db/bundb/db.go +++ b/internal/db/bundb/db.go @@ -21,6 +21,7 @@ import ( "context" "database/sql" "time" + "unsafe" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtserror" @@ -66,7 +67,7 @@ func WrapDB(db *bun.DB) *DB { return &DB{ raw: rawdb{ errHook: errProc, - DB: db.DB, + db: db.DB, }, bun: db, } @@ -87,18 +88,7 @@ func (db *DB) PingContext(ctx context.Context) error { return db.bun.PingContext // Close is a direct call-through to bun.DB.Close(). func (db *DB) Close() error { return db.bun.Close() } -// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout. -func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) { - bundb := db.bun // use *bun.DB interface to return bun.Tx type - err = retryOnBusy(ctx, func() error { - tx, err = bundb.BeginTx(ctx, opts) - err = db.raw.errHook(err) - return err - }) - return -} - -// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout. +// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing. func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { bundb := db.bun // use underlying *bun.DB interface for their query formatting err = retryOnBusy(ctx, func() error { @@ -109,7 +99,7 @@ func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (resul return } -// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout. +// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing. func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { bundb := db.bun // use underlying *bun.DB interface for their query formatting err = retryOnBusy(ctx, func() error { @@ -120,19 +110,40 @@ func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows return } -// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout. +// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing. func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { bundb := db.bun // use underlying *bun.DB interface for their query formatting _ = retryOnBusy(ctx, func() error { row = bundb.QueryRowContext(ctx, query, args...) - err := db.raw.errHook(row.Err()) - return err + if err := db.raw.errHook(row.Err()); err != nil { + updateRowError(row, err) // set new error + } + return row.Err() }) return } +// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout and our own error processing. +func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx Tx, err error) { + var buntx bun.Tx // captured bun.Tx + bundb := db.bun // use *bun.DB interface to return bun.Tx type + + err = retryOnBusy(ctx, func() error { + buntx, err = bundb.BeginTx(ctx, opts) + err = db.raw.errHook(err) + return err + }) + + if err == nil { + // Wrap bun.Tx in our type. + tx = wrapTx(db, &buntx) + } + + return +} + // RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts. -func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error { +func (db *DB) RunInTx(ctx context.Context, fn func(Tx) error) error { // Attempt to start new transaction. tx, err := db.BeginTx(ctx, nil) if err != nil { @@ -143,24 +154,18 @@ func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error { defer func() { if !done { - // Rollback (with retry-backoff). - _ = retryOnBusy(ctx, func() error { - err := tx.Rollback() - return db.raw.errHook(err) - }) + // Rollback tx. + _ = tx.Rollback() } }() // Perform supplied transaction if err := fn(tx); err != nil { - return db.raw.errHook(err) + return err } - // Commit (with retry-backoff). - err = retryOnBusy(ctx, func() error { - err := tx.Commit() - return db.raw.errHook(err) - }) + // Commit tx. + err = tx.Commit() done = true return err } @@ -275,39 +280,258 @@ type rawdb struct { // embedded raw // db interface - *sql.DB + db *sql.DB } -// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout. +// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout and our own error processing. func (db *rawdb) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { err = retryOnBusy(ctx, func() error { - result, err = db.DB.ExecContext(ctx, query, args...) + result, err = db.db.ExecContext(ctx, query, args...) err = db.errHook(err) return err }) return } -// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout. +// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout and our own error processing. func (db *rawdb) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { err = retryOnBusy(ctx, func() error { - rows, err = db.DB.QueryContext(ctx, query, args...) + rows, err = db.db.QueryContext(ctx, query, args...) err = db.errHook(err) return err }) return } -// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout. +// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout and our own error processing. func (db *rawdb) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { _ = retryOnBusy(ctx, func() error { - row = db.DB.QueryRowContext(ctx, query, args...) + row = db.db.QueryRowContext(ctx, query, args...) err := db.errHook(row.Err()) return err }) return } +// Tx wraps a bun transaction instance +// to provide common per-dialect SQL error +// conversions to common types, and retries +// on busy commit/rollback (SQLite only). +type Tx struct { + // our own wrapped Tx type + // kept separate to the *bun.Tx + // type to be passed into query + // builders as bun.IConn iface + // (this prevents double firing + // bun query hooks). + // + // also holds per-dialect + // error hook function. + raw rawtx + + // bun Tx interface we use + // for dialects, and improved + // struct marshal/unmarshaling. + bun *bun.Tx +} + +// wrapTx wraps a given bun.Tx in our own wrapping Tx type. +func wrapTx(db *DB, tx *bun.Tx) Tx { + return Tx{ + raw: rawtx{ + errHook: db.raw.errHook, + tx: tx.Tx, + }, + bun: tx, + } +} + +// ExecContext wraps bun.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + buntx := tx.bun // use underlying *bun.Tx interface for their query formatting + res, err := buntx.ExecContext(ctx, query, args...) + err = tx.raw.errHook(err) + return res, err +} + +// QueryContext wraps bun.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + buntx := tx.bun // use underlying *bun.Tx interface for their query formatting + rows, err := buntx.QueryContext(ctx, query, args...) + err = tx.raw.errHook(err) + return rows, err +} + +// QueryRowContext wraps bun.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row { + buntx := tx.bun // use underlying *bun.Tx interface for their query formatting + row := buntx.QueryRowContext(ctx, query, args...) + if err := tx.raw.errHook(row.Err()); err != nil { + updateRowError(row, err) // set new error + } + return row +} + +// Commit wraps bun.Tx.Commit() with retry-busy timeout and our own error processing. +func (tx Tx) Commit() (err error) { + buntx := tx.bun // use *bun.Tx interface + err = retryOnBusy(context.TODO(), func() error { + err = buntx.Commit() + err = tx.raw.errHook(err) + return err + }) + return +} + +// Rollback wraps bun.Tx.Rollback() with retry-busy timeout and our own error processing. +func (tx Tx) Rollback() (err error) { + buntx := tx.bun // use *bun.Tx interface + err = retryOnBusy(context.TODO(), func() error { + err = buntx.Rollback() + err = tx.raw.errHook(err) + return err + }) + return +} + +// Dialect is a direct call-through to bun.DB.Dialect(). +func (tx Tx) Dialect() schema.Dialect { + return tx.bun.Dialect() +} + +func (tx Tx) NewValues(model interface{}) *bun.ValuesQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewValues(model).Conn(&tx.raw) +} + +func (tx Tx) NewMerge() *bun.MergeQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewMerge().Conn(&tx.raw) +} + +func (tx Tx) NewSelect() *bun.SelectQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewSelect().Conn(&tx.raw) +} + +func (tx Tx) NewInsert() *bun.InsertQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewInsert().Conn(&tx.raw) +} + +func (tx Tx) NewUpdate() *bun.UpdateQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewUpdate().Conn(&tx.raw) +} + +func (tx Tx) NewDelete() *bun.DeleteQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewDelete().Conn(&tx.raw) +} + +func (tx Tx) NewRaw(query string, args ...interface{}) *bun.RawQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewRaw(query, args...).Conn(&tx.raw) +} + +func (tx Tx) NewCreateTable() *bun.CreateTableQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewCreateTable().Conn(&tx.raw) +} + +func (tx Tx) NewDropTable() *bun.DropTableQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewDropTable().Conn(&tx.raw) +} + +func (tx Tx) NewCreateIndex() *bun.CreateIndexQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewCreateIndex().Conn(&tx.raw) +} + +func (tx Tx) NewDropIndex() *bun.DropIndexQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewDropIndex().Conn(&tx.raw) +} + +func (tx Tx) NewTruncateTable() *bun.TruncateTableQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewTruncateTable().Conn(&tx.raw) +} + +func (tx Tx) NewAddColumn() *bun.AddColumnQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewAddColumn().Conn(&tx.raw) +} + +func (tx Tx) NewDropColumn() *bun.DropColumnQuery { + // note: passing in rawtx as conn iface so no double query-hook + // firing when passed through the bun.Tx.Query___() functions. + return tx.bun.NewDropColumn().Conn(&tx.raw) +} + +type rawtx struct { + // dialect specific error + // processing function hook. + errHook func(error) error + + // embedded raw + // tx interface + tx *sql.Tx +} + +// ExecContext wraps sql.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx *rawtx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + res, err := tx.tx.ExecContext(ctx, query, args...) + err = tx.errHook(err) + return res, err +} + +// QueryContext wraps sql.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx *rawtx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + rows, err := tx.tx.QueryContext(ctx, query, args...) + err = tx.errHook(err) + return rows, err +} + +// QueryRowContext wraps sql.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction). +func (tx *rawtx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row { + row := tx.tx.QueryRowContext(ctx, query, args...) + if err := tx.errHook(row.Err()); err != nil { + updateRowError(row, err) // set new error + } + return row +} + +// updateRowError updates an sql.Row's internal error field using the unsafe package. +func updateRowError(sqlrow *sql.Row, err error) { + type row struct { + err error + rows *sql.Rows + } + + // compile-time check to ensure sql.Row not changed. + if unsafe.Sizeof(row{}) != unsafe.Sizeof(sql.Row{}) { + panic("sql.Row has changed definition") + } + + // this code is awful and i must be shamed for this. + (*row)(unsafe.Pointer(sqlrow)).err = err +} + // retryOnBusy will retry given function on returned 'errBusy'. func retryOnBusy(ctx context.Context, fn func() error) error { var backoff time.Duration diff --git a/internal/db/bundb/emoji.go b/internal/db/bundb/emoji.go index 2a3d91fe4..a3a19485d 100644 --- a/internal/db/bundb/emoji.go +++ b/internal/db/bundb/emoji.go @@ -105,7 +105,7 @@ func (e *emojiDB) DeleteEmojiByID(ctx context.Context, id string) error { return err } - return e.db.RunInTx(ctx, func(tx bun.Tx) error { + return e.db.RunInTx(ctx, func(tx Tx) error { // Delete relational links between this emoji // and any statuses using it, returning the // status IDs so we can later update them. diff --git a/internal/db/bundb/list.go b/internal/db/bundb/list.go index 23d9c13fb..7a117670a 100644 --- a/internal/db/bundb/list.go +++ b/internal/db/bundb/list.go @@ -206,7 +206,7 @@ func (l *listDB) DeleteListByID(ctx context.Context, id string) error { } }() - return l.db.RunInTx(ctx, func(tx bun.Tx) error { + return l.db.RunInTx(ctx, func(tx Tx) error { // Delete all entries attached to list. if _, err := tx.NewDelete(). Table("list_entries"). @@ -423,7 +423,7 @@ func (l *listDB) PutListEntries(ctx context.Context, entries []*gtsmodel.ListEnt }() // Finally, insert each list entry into the database. - return l.db.RunInTx(ctx, func(tx bun.Tx) error { + return l.db.RunInTx(ctx, func(tx Tx) error { for _, entry := range entries { entry := entry // rescope if err := l.state.Caches.GTS.ListEntry().Store(entry, func() error { diff --git a/internal/db/bundb/marker.go b/internal/db/bundb/marker.go index 861f7de36..5d365e08a 100644 --- a/internal/db/bundb/marker.go +++ b/internal/db/bundb/marker.go @@ -87,7 +87,7 @@ func (m *markerDB) UpdateMarker(ctx context.Context, marker *gtsmodel.Marker) er // Optimistic concurrency control: start a transaction, try to update a row with a previously retrieved version. // If the update in the transaction fails to actually change anything, another update happened concurrently, and // this update should be retried by the caller, which in this case involves sending HTTP 409 to the API client. - return m.db.RunInTx(ctx, func(tx bun.Tx) error { + return m.db.RunInTx(ctx, func(tx Tx) error { result, err := tx.NewUpdate(). Model(marker). WherePK(). diff --git a/internal/db/bundb/media.go b/internal/db/bundb/media.go index fe6aefa90..a2603eacc 100644 --- a/internal/db/bundb/media.go +++ b/internal/db/bundb/media.go @@ -122,7 +122,7 @@ func (m *mediaDB) DeleteAttachment(ctx context.Context, id string) error { defer m.state.Caches.GTS.Media().Invalidate("ID", id) // Delete media attachment in new transaction. - err = m.db.RunInTx(ctx, func(tx bun.Tx) error { + err = m.db.RunInTx(ctx, func(tx Tx) error { if media.AccountID != "" { var account gtsmodel.Account diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go index 0e97d32cc..26f0c1f38 100644 --- a/internal/db/bundb/status.go +++ b/internal/db/bundb/status.go @@ -276,7 +276,7 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error // It is safe to run this database transaction within cache.Store // as the cache does not attempt a mutex lock until AFTER hook. // - return s.db.RunInTx(ctx, func(tx bun.Tx) error { + return s.db.RunInTx(ctx, func(tx Tx) error { // create links between this status and any emojis it uses for _, i := range status.EmojiIDs { if _, err := tx. @@ -342,7 +342,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co // It is safe to run this database transaction within cache.Store // as the cache does not attempt a mutex lock until AFTER hook. // - return s.db.RunInTx(ctx, func(tx bun.Tx) error { + return s.db.RunInTx(ctx, func(tx Tx) error { // create links between this status and any emojis it uses for _, i := range status.EmojiIDs { if _, err := tx. @@ -420,7 +420,7 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error { // On return ensure status invalidated from cache. defer s.state.Caches.GTS.Status().Invalidate("ID", id) - return s.db.RunInTx(ctx, func(tx bun.Tx) error { + return s.db.RunInTx(ctx, func(tx Tx) error { // delete links between this status and any emojis it uses if _, err := tx. NewDelete().