From eda05e5af3eacdbe70b7cbf069ef5722812a407a Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 29 Apr 2024 11:37:22 +0100 Subject: [PATCH] updates the simple queue memory pool to actually self-clean + limit growth --- internal/queue/simple.go | 87 +++++++++++++++++++++++++++++++++------- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/internal/queue/simple.go b/internal/queue/simple.go index 9307183f2..a4d675f39 100644 --- a/internal/queue/simple.go +++ b/internal/queue/simple.go @@ -29,33 +29,58 @@ import ( // elements to reduce overall memory usage. type SimpleQueue[T any] struct { l list.List[T] - p []*list.Elem[T] + p elemPool[T] w chan struct{} m sync.Mutex + n uint8 } // Push will push given value to the queue. func (q *SimpleQueue[T]) Push(value T) { q.m.Lock() - elem := q.alloc() + + // Wrap in element. + elem := q.p.alloc() elem.Value = value + + // Push new elem to queue. q.l.PushElemFront(elem) + if q.w != nil { + // Notify any goroutines + // blocking on q.Wait(), + // or on PopCtx(...). close(q.w) q.w = nil } + q.m.Unlock() } // Pop will attempt to pop value from the queue. func (q *SimpleQueue[T]) Pop() (value T, ok bool) { q.m.Lock() + + // Check for a tail (i.e. not empty). if ok = (q.l.Tail != nil); ok { + + // Extract value. tail := q.l.Tail value = tail.Value + + // Remove tail. q.l.Remove(tail) - q.free(tail) + q.p.free(tail) + + if q.l.Len() == 0 { + // Every 255x we reach a zero + // length queue, sweep mem pool. + if q.n++; q.n == ^uint8(0) { + q.p.GC() + } + } } + q.m.Unlock() return } @@ -105,7 +130,15 @@ func (q *SimpleQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) { // Remove element. q.l.Remove(elem) - q.free(elem) + q.p.free(elem) + + if q.l.Len() == 0 { + // Every 255x we reach a zero + // length queue, sweep mem pool. + if q.n++; q.n == ^uint8(0) { + q.p.GC() + } + } // Done with lock. q.m.Unlock() @@ -121,21 +154,45 @@ func (q *SimpleQueue[T]) Len() int { return l } -// alloc will allocate new list element (relying on memory pool). -func (q *SimpleQueue[T]) alloc() *list.Elem[T] { - if len(q.p) > 0 { - elem := q.p[len(q.p)-1] - q.p = q.p[:len(q.p)-1] - return elem - } - return new(list.Elem[T]) +// elemPool is a very simple +// list.Elem[T] memory pool. +type elemPool[T any] struct { + current []*list.Elem[T] + victim []*list.Elem[T] } -// free will free list element and release to pool. -func (q *SimpleQueue[T]) free(elem *list.Elem[T]) { +func (p *elemPool[T]) alloc() *list.Elem[T] { + // First try the current queue + if l := len(p.current) - 1; l >= 0 { + mu := p.current[l] + p.current = p.current[:l] + return mu + } + + // Next try the victim queue. + if l := len(p.victim) - 1; l >= 0 { + mu := p.victim[l] + p.victim = p.victim[:l] + return mu + } + + // Lastly, alloc new. + mu := new(list.Elem[T]) + return mu +} + +// free will release given element to pool. +func (p *elemPool[T]) free(elem *list.Elem[T]) { var zero T elem.Next = nil elem.Prev = nil elem.Value = zero - q.p = append(q.p, elem) + p.current = append(p.current, elem) +} + +// GC will clear out unused entries from the elemPool. +func (p *elemPool[T]) GC() { + current := p.current + p.current = nil + p.victim = current }