Queue: Improve logging

This commit is contained in:
Andrew Thornton 2019-12-11 20:34:05 +00:00
parent 2927bc6fe5
commit 9ad9070555
No known key found for this signature in database
GPG key ID: 3CDE74631F13A748
8 changed files with 115 additions and 62 deletions

View file

@ -12,6 +12,8 @@ import (
"sort"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
var manager *Manager
@ -96,6 +98,7 @@ func (m *Manager) Add(queue Queue,
}
m.Queues[desc.QID] = desc
m.mutex.Unlock()
log.Trace("Queue Manager registered: %s (QID: %d)", desc.Name, desc.QID)
return desc.QID
}
@ -104,6 +107,8 @@ func (m *Manager) Remove(qid int64) {
m.mutex.Lock()
delete(m.Queues, qid)
m.mutex.Unlock()
log.Trace("Queue Manager removed: QID: %d", qid)
}
// GetDescription by qid

View file

@ -91,16 +91,18 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
go l.readToChan()
log.Trace("%s Waiting til closed", l.name)
log.Trace("LevelQueue: %s Waiting til closed", l.name)
<-l.closed
log.Trace("%s Waiting til done", l.name)
log.Trace("LevelQueue: %s Waiting til done", l.name)
l.pool.Wait()
// FIXME: graceful: Needs HammerContext
log.Trace("%s Waiting til cleaned", l.name)
l.pool.CleanUp(context.TODO())
log.Trace("%s cleaned", l.name)
log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
l.pool.CleanUp(ctx)
cancel()
log.Trace("LevelQueue: %s Cleaned", l.name)
}
@ -115,7 +117,7 @@ func (l *LevelQueue) readToChan() {
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
log.Error("%s RPop: %v", l.name, err)
log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
}
time.Sleep(time.Millisecond * 100)
continue
@ -137,14 +139,14 @@ func (l *LevelQueue) readToChan() {
err = json.Unmarshal(bs, &data)
}
if err != nil {
log.Error("LevelQueue: %s failed to unmarshal: %v", l.name, err)
time.Sleep(time.Millisecond * 10)
log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("LevelQueue %s: task found: %#v", l.name, data)
log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
l.pool.Push(data)
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 10)
}
}
@ -170,7 +172,7 @@ func (l *LevelQueue) Push(data Data) error {
// Shutdown this queue and stop processing
func (l *LevelQueue) Shutdown() {
log.Trace("Shutdown: %s", l.name)
log.Trace("LevelQueue: %s Shutdown", l.name)
select {
case <-l.closed:
default:
@ -180,10 +182,16 @@ func (l *LevelQueue) Shutdown() {
// Terminate this queue and close the queue
func (l *LevelQueue) Terminate() {
log.Trace("Terminating: %s", l.name)
log.Trace("LevelQueue: %s Terminating", l.name)
l.Shutdown()
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
select {
case <-l.terminated:
default:
close(l.terminated)
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
}
}
}

View file

@ -133,22 +133,28 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
_ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
}()
log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
<-p.closed
log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
p.ChannelQueue.pool.cancel()
p.internal.(*LevelQueue).pool.cancel()
log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
p.ChannelQueue.pool.Wait()
p.internal.(*LevelQueue).pool.Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
for data := range p.ChannelQueue.pool.dataChan {
_ = p.internal.Push(data)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
}()
log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
}
// Shutdown processing this queue
func (p *PersistableChannelQueue) Shutdown() {
log.Trace("Shutdown: %s", p.delayedStarter.name)
log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
select {
case <-p.closed:
default:
@ -163,7 +169,7 @@ func (p *PersistableChannelQueue) Shutdown() {
// Terminate this queue and close the queue
func (p *PersistableChannelQueue) Terminate() {
log.Trace("Terminating: %s", p.delayedStarter.name)
log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
p.Shutdown()
p.lock.Lock()
defer p.lock.Unlock()

View file

@ -24,8 +24,8 @@ func TestPersistableChannelQueue(t *testing.T) {
}
}
var queueShutdown func()
var queueTerminate func()
queueShutdown := []func(){}
queueTerminate := []func(){}
tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data")
assert.NoError(t, err)
@ -40,9 +40,9 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
queueShutdown = shutdown
queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) {
queueTerminate = terminate
queueTerminate = append(queueTerminate, terminate)
})
test1 := testData{"A", 1}
@ -66,7 +66,9 @@ func TestPersistableChannelQueue(t *testing.T) {
err = queue.Push(test1)
assert.Error(t, err)
queueShutdown()
for _, callback := range queueShutdown {
callback()
}
time.Sleep(200 * time.Millisecond)
err = queue.Push(&test1)
assert.NoError(t, err)
@ -77,7 +79,9 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Fail(t, "Handler processing should have stopped")
default:
}
queueTerminate()
for _, callback := range queueTerminate {
callback()
}
// Reopen queue
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
@ -89,9 +93,9 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
queueShutdown = shutdown
queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) {
queueTerminate = terminate
queueTerminate = append(queueTerminate, terminate)
})
result3 := <-handleChan
@ -101,7 +105,11 @@ func TestPersistableChannelQueue(t *testing.T) {
result4 := <-handleChan
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
queueShutdown()
queueTerminate()
for _, callback := range queueShutdown {
callback()
}
for _, callback := range queueTerminate {
callback()
}
}

View file

@ -6,6 +6,7 @@ package queue
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
@ -23,11 +24,15 @@ func TestLevelQueue(t *testing.T) {
}
}
var queueShutdown func()
var queueTerminate func()
queueShutdown := []func(){}
queueTerminate := []func(){}
tmpDir, err := ioutil.TempDir("", "level-queue-test-data")
assert.NoError(t, err)
defer os.RemoveAll(tmpDir)
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
DataDir: "level-queue-test-data",
DataDir: tmpDir,
BatchLength: 2,
Workers: 1,
QueueLength: 20,
@ -38,9 +43,9 @@ func TestLevelQueue(t *testing.T) {
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
queueShutdown = shutdown
queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) {
queueTerminate = terminate
queueTerminate = append(queueTerminate, terminate)
})
test1 := testData{"A", 1}
@ -64,7 +69,9 @@ func TestLevelQueue(t *testing.T) {
err = queue.Push(test1)
assert.Error(t, err)
queueShutdown()
for _, callback := range queueShutdown {
callback()
}
time.Sleep(200 * time.Millisecond)
err = queue.Push(&test1)
assert.NoError(t, err)
@ -75,24 +82,30 @@ func TestLevelQueue(t *testing.T) {
assert.Fail(t, "Handler processing should have stopped")
default:
}
queueTerminate()
for _, callback := range queueTerminate {
callback()
}
// Reopen queue
queue, err = NewLevelQueue(handle, LevelQueueConfiguration{
DataDir: "level-queue-test-data",
BatchLength: 2,
Workers: 1,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
}, &testData{})
queue, err = NewWrappedQueue(handle,
WrappedQueueConfiguration{
Underlying: LevelQueueType,
Config: LevelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 2,
Workers: 1,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
},
}, &testData{})
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
queueShutdown = shutdown
queueShutdown = append(queueShutdown, shutdown)
}, func(_ context.Context, terminate func()) {
queueTerminate = terminate
queueTerminate = append(queueTerminate, terminate)
})
result3 := <-handleChan
@ -102,8 +115,10 @@ func TestLevelQueue(t *testing.T) {
result4 := <-handleChan
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
queueShutdown()
queueTerminate()
os.RemoveAll("level-queue-test-data")
for _, callback := range queueShutdown {
callback()
}
for _, callback := range queueTerminate {
callback()
}
}

View file

@ -116,10 +116,16 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))
go r.readToChan()
log.Trace("RedisQueue: %s Waiting til closed", r.name)
<-r.closed
log.Trace("RedisQueue: %s Waiting til done", r.name)
r.pool.Wait()
// FIXME: graceful: Needs HammerContext
r.pool.CleanUp(context.TODO())
log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
r.pool.CleanUp(ctx)
cancel()
}
func (r *RedisQueue) readToChan() {
@ -132,7 +138,7 @@ func (r *RedisQueue) readToChan() {
default:
bs, err := r.client.LPop(r.queueName).Bytes()
if err != nil && err != redis.Nil {
log.Error("RedisQueue: %s LPop failed: %v", r.name, err)
log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
time.Sleep(time.Millisecond * 100)
continue
}
@ -153,12 +159,12 @@ func (r *RedisQueue) readToChan() {
err = json.Unmarshal(bs, &data)
}
if err != nil {
log.Error("RedisQueue: %s Unmarshal: %v", r.name, err)
log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("RedisQueue: %s task found: %#v", r.name, data)
log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
r.pool.Push(data)
time.Sleep(time.Millisecond * 10)
}

View file

@ -92,7 +92,7 @@ type WrappedQueue struct {
// NewWrappedQueue will attempt to create a queue of the provided type,
// but if there is a problem creating this queue it will instead create
// a WrappedQueue with delayed the startup of the queue instead and a
// a WrappedQueue with delayed startup of the queue instead and a
// channel which will be redirected to the queue
func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
@ -162,11 +162,12 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
}
q.internal.Run(atShutdown, atTerminate)
log.Trace("WrappedQueue: %s Done", q.name)
}
// Shutdown this queue and stop processing
func (q *WrappedQueue) Shutdown() {
log.Trace("Shutdown: %s", q.name)
log.Trace("WrappedQueue: %s Shutdown", q.name)
q.lock.Lock()
defer q.lock.Unlock()
if q.internal == nil {
@ -179,7 +180,7 @@ func (q *WrappedQueue) Shutdown() {
// Terminate this queue and close the queue
func (q *WrappedQueue) Terminate() {
log.Trace("Terminating: %s", q.name)
log.Trace("WrappedQueue: %s Terminating", q.name)
q.lock.Lock()
defer q.lock.Unlock()
if q.internal == nil {

View file

@ -72,7 +72,7 @@ func (p *WorkerPool) pushBoost(data Data) {
ctx, cancel := context.WithCancel(p.baseCtx)
desc := GetManager().GetDescription(p.qid)
if desc != nil {
log.Warn("Worker Channel for %v blocked for %v - adding %d temporary workers for %s, block timeout now %v", desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
start := time.Now()
pid := desc.RegisterWorkers(p.boostWorkers, start, false, start, cancel)
@ -82,7 +82,7 @@ func (p *WorkerPool) pushBoost(data Data) {
cancel()
}()
} else {
log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
}
go func() {
<-time.After(p.boostTimeout)
@ -128,6 +128,10 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
desc.RemoveWorkers(pid)
cancel()
}()
log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, desc.Name, number, pid)
} else {
log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
}
p.addWorkers(ctx, number)
return cancel
@ -173,18 +177,18 @@ func (p *WorkerPool) Wait() {
// CleanUp will drain the remaining contents of the channel
// This should be called after AddWorkers context is closed
func (p *WorkerPool) CleanUp(ctx context.Context) {
log.Trace("CleanUp")
log.Trace("WorkerPool: %d CleanUp", p.qid)
close(p.dataChan)
for data := range p.dataChan {
p.handle(data)
select {
case <-ctx.Done():
log.Warn("Cleanup context closed before finishing clean-up")
log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
return
default:
}
}
log.Trace("CleanUp done")
log.Trace("WorkerPool: %d CleanUp Done", p.qid)
}
func (p *WorkerPool) doWork(ctx context.Context) {