diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index f6587a1b..5f8b9ad9 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -21,6 +21,7 @@ package federatingdb import ( "context" "sync" + "time" "github.com/go-fed/activity/pub" "github.com/go-fed/activity/streams/vocab" @@ -41,7 +42,9 @@ type DB interface { // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. // It doesn't care what the underlying implementation of the DB interface is, as long as it works. type federatingDB struct { - locks *sync.Map + mutex sync.Mutex + locks map[string]*mutex + pool sync.Pool db db.DB config *config.Config log *logrus.Logger @@ -50,11 +53,32 @@ type federatingDB struct { // New returns a DB interface using the given database, config, and logger. func New(db db.DB, config *config.Config, log *logrus.Logger) DB { - return &federatingDB{ - locks: new(sync.Map), + fdb := federatingDB{ + mutex: sync.Mutex{}, + locks: make(map[string]*mutex, 100), + pool: sync.Pool{New: func() interface{} { return &mutex{} }}, db: db, config: config, log: log, typeConverter: typeutils.NewConverter(config, db), } + go fdb.cleanupLocks() + return &fdb +} + +func (db *federatingDB) cleanupLocks() { + for { + // Sleep for a minute... + time.Sleep(time.Minute) + + // Delete unused locks from map + db.mutex.Lock() + for id, mu := range db.locks { + if !mu.inUse() { + delete(db.locks, id) + db.pool.Put(mu) + } + } + db.mutex.Unlock() + } } diff --git a/internal/federation/federatingdb/lock.go b/internal/federation/federatingdb/lock.go index c9062da8..0d35f337 100644 --- a/internal/federation/federatingdb/lock.go +++ b/internal/federation/federatingdb/lock.go @@ -23,6 +23,7 @@ import ( "errors" "net/url" "sync" + "sync/atomic" ) // Lock takes a lock for the object at the specified id. If an error @@ -45,14 +46,21 @@ func (f *federatingDB) Lock(c context.Context, id *url.URL) error { if id == nil { return errors.New("Lock: id was nil") } + idStr := id.String() - mu := &sync.Mutex{} - mu.Lock() // Optimistically lock if we do store it. - i, loaded := f.locks.LoadOrStore(id.String(), mu) - if loaded { - mu = i.(*sync.Mutex) - mu.Lock() + // Acquire map lock + f.mutex.Lock() + + // Get mutex, or create new + mu, ok := f.locks[idStr] + if !ok { + mu = f.pool.Get().(*mutex) + f.locks[idStr] = mu } + + // Unlock map, acquire mutex lock + f.mutex.Unlock() + mu.Lock() return nil } @@ -66,12 +74,43 @@ func (f *federatingDB) Unlock(c context.Context, id *url.URL) error { if id == nil { return errors.New("Unlock: id was nil") } + idStr := id.String() + + // Check map for mutex + f.mutex.Lock() + mu, ok := f.locks[idStr] + f.mutex.Unlock() - i, ok := f.locks.Load(id.String()) if !ok { return errors.New("missing an id in unlock") } - mu := i.(*sync.Mutex) + + // Unlock the mutex mu.Unlock() return nil } + +// mutex defines a mutex we can check the lock status of. +// this is not perfect, but it's good enough for a semi +// regular mutex cleanup routine +type mutex struct { + mu sync.Mutex + st uint32 +} + +// inUse returns if the mutex is in use +func (mu *mutex) inUse() bool { + return atomic.LoadUint32(&mu.st) == 1 +} + +// Lock acquire mutex lock +func (mu *mutex) Lock() { + mu.mu.Lock() + atomic.StoreUint32(&mu.st, 1) +} + +// Unlock releases mutex lock +func (mu *mutex) Unlock() { + mu.mu.Unlock() + atomic.StoreUint32(&mu.st, 0) +}