forgejo/modules/queue/queue_bytefifo.go
zeripath e22ee468cf
Exponential Backoff for ByteFIFO (#15724)
This PR is another in the vein of queue improvements. It suggests an
exponential backoff for bytefifo queues to reduce the load from queue
polling. This will mostly be useful for redis queues.

Signed-off-by: Andrew Thornton <art27@cantab.net>

Co-authored-by: Lauris BH <lauris@nix.lv>
2021-05-08 17:29:47 +01:00

269 lines
6.4 KiB
Go

// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
jsoniter "github.com/json-iterator/go"
)
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
Name string
}
var _ Queue = &ByteFIFOQueue{}
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
*WorkerPool
byteFIFO ByteFIFO
typ Type
closed chan struct{}
terminated chan struct{}
exemplar interface{}
workers int
name string
lock sync.Mutex
}
// NewByteFIFOQueue creates a new ByteFIFOQueue
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
return &ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
closed: make(chan struct{}),
terminated: make(chan struct{}),
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
}, nil
}
// Name returns the name of this queue
func (q *ByteFIFOQueue) Name() string {
return q.name
}
// Push pushes data to the fifo
func (q *ByteFIFOQueue) Push(data Data) error {
return q.PushFunc(data, nil)
}
// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
bs, err := json.Marshal(data)
if err != nil {
return err
}
return q.byteFIFO.PushFunc(bs, fn)
}
// IsEmpty checks if the queue is empty
func (q *ByteFIFOQueue) IsEmpty() bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.WorkerPool.IsEmpty() {
return false
}
return q.byteFIFO.Len() == 0
}
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), q.Shutdown)
atTerminate(context.Background(), q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
go func() {
_ = q.AddWorkers(q.workers, 0)
}()
go q.readToChan()
log.Trace("%s: %s Waiting til closed", q.typ, q.name)
<-q.closed
log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait()
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
q.CleanUp(ctx)
cancel()
}
func (q *ByteFIFOQueue) readToChan() {
// handle quick cancels
select {
case <-q.closed:
// tell the pool to shutdown.
q.cancel()
return
default:
}
backOffTime := time.Millisecond * 100
maxBackOffTime := time.Second * 3
for {
success, resetBackoff := q.doPop()
if resetBackoff {
backOffTime = 100 * time.Millisecond
}
if success {
select {
case <-q.closed:
// tell the pool to shutdown.
q.cancel()
return
default:
}
} else {
select {
case <-q.closed:
// tell the pool to shutdown.
q.cancel()
return
case <-time.After(backOffTime):
}
backOffTime += backOffTime / 2
if backOffTime > maxBackOffTime {
backOffTime = maxBackOffTime
}
}
}
}
func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
q.lock.Lock()
defer q.lock.Unlock()
bs, err := q.byteFIFO.Pop()
if err != nil {
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
return
}
if len(bs) == 0 {
return
}
resetBackoff = true
data, err := unmarshalAs(bs, q.exemplar)
if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
return
}
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data)
success = true
return
}
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)
q.lock.Lock()
select {
case <-q.closed:
default:
close(q.closed)
}
q.lock.Unlock()
log.Debug("%s: %s Shutdown", q.typ, q.name)
}
// IsShutdown returns a channel which is closed when this Queue is shutdown
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
return q.closed
}
// Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown()
q.lock.Lock()
select {
case <-q.terminated:
q.lock.Unlock()
return
default:
}
close(q.terminated)
q.lock.Unlock()
if log.IsDebug() {
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
}
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
log.Debug("%s: %s Terminated", q.typ, q.name)
}
// IsTerminated returns a channel which is closed when this Queue is terminated
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
return q.terminated
}
var _ UniqueQueue = &ByteFIFOUniqueQueue{}
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
type ByteFIFOUniqueQueue struct {
ByteFIFOQueue
}
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
closed: make(chan struct{}),
terminated: make(chan struct{}),
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
},
}, nil
}
// Has checks if the provided data is in the queue
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if !assignableTo(data, q.exemplar) {
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
bs, err := json.Marshal(data)
if err != nil {
return false, err
}
return q.byteFIFO.(UniqueByteFIFO).Has(bs)
}