remove PopCtx() method, let others instead rely on Wait()

This commit is contained in:
kim 2024-04-04 16:17:05 +01:00
parent 7080930371
commit ba60397e30
2 changed files with 38 additions and 49 deletions

View file

@ -163,6 +163,7 @@ loop:
dlv.attempts > w.client.retries {
// Drop deliveries when no retry requested,
// or we reach max defined retry attempts.
// "bad" hosts support a max of 1 attempt.
w.client.badHosts.Set(dlv.host, struct{}{})
continue loop
}
@ -177,32 +178,39 @@ loop:
// next gets the next available delivery, blocking until available if necessary.
func (w *APDeliveryWorker) next(ctx context.Context) (*delivery, bool) {
// Try pop next queued.
msg, ok := w.queue.Pop()
loop:
for {
// Try pop next queued.
msg, ok := w.queue.Pop()
if !ok {
// Check the backlog.
if len(w.backlog) > 0 {
// Sort by 'next' time.
sortDeliveries(w.backlog)
// Pop next delivery.
dlv := w.popBacklog()
return dlv, true
}
// Backlog is empty, we MUST
// block until next enqueued.
msg, ok = w.queue.PopCtx(ctx)
if !ok {
return nil, false
}
}
// Check the backlog.
if len(w.backlog) > 0 {
// Wrap msg in delivery type.
return wrapMsg(ctx, msg), true
// Sort by 'next' time.
sortDeliveries(w.backlog)
// Pop next delivery.
dlv := w.popBacklog()
return dlv, true
}
select {
// Backlog is empty, we MUST
// block until next enqueued.
case <-w.queue.Wait():
continue loop
// Worker was stopped.
case <-ctx.Done():
return nil, false
}
}
// Wrap msg in delivery type.
return wrapMsg(ctx, msg), true
}
}
// popBacklog pops next available from the backlog.

View file

@ -18,13 +18,13 @@
package queue
import (
"context"
"sync/atomic"
"codeberg.org/gruf/go-structr"
)
// StructQueue ...
// StructQueue wraps a structr.Queue{} to
// provide simple index caching by name.
type StructQueue[StructType any] struct {
queue structr.Queue[StructType]
index map[string]*structr.Index
@ -46,36 +46,16 @@ func (q *StructQueue[T]) Pop() (value T, ok bool) {
return q.queue.PopFront()
}
// PopCtx wraps structr.Queue{}.PopFront() to add sleep until value is available.
func (q *StructQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) {
for {
// Try pop from front of queue.
value, ok = q.queue.PopFront()
if ok {
return
}
select {
// Context canceled.
case <-ctx.Done():
return
// Waiter released.
case <-q.Wait():
}
}
}
// Push wraps structr.Queue{}.PushBack() to add sleeping pop goroutine awakening.
// Push wraps structr.Queue{}.PushBack() to awaken those blocking on <-.Wait().
func (q *StructQueue[T]) Push(values ...T) {
q.queue.PushBack(values...)
q.broadcast()
}
// Delete removes all queued entries under index with key.
// Delete pops (and drops!) all queued entries under index with key.
func (q *StructQueue[T]) Delete(index string, key ...any) {
i := q.index[index]
q.queue.Pop(i, i.Key(key...))
_ = q.queue.Pop(i, i.Key(key...))
}
// Len: see structr.Queue{}.Len().
@ -83,7 +63,8 @@ func (q *StructQueue[T]) Len() int {
return q.queue.Len()
}
// Wait safely returns current (read-only) wait channel.
// Wait returns current wait channel, which may be
// blocked on to awaken when new value pushed to queue.
func (q *StructQueue[T]) Wait() <-chan struct{} {
var ch chan struct{}