diff --git a/custom/conf/app.ini.sample b/custom/conf/app.ini.sample index c9ca821280..4b810f91f7 100644 --- a/custom/conf/app.ini.sample +++ b/custom/conf/app.ini.sample @@ -371,6 +371,19 @@ REPO_INDEXER_INCLUDE = ; A comma separated list of glob patterns to exclude from the index; ; default is empty REPO_INDEXER_EXCLUDE = +[queue] +; General queue queue type, currently support: persistable-channel, channel, level, redis, dummy +; default to persistable-channel +TYPE = persistable-channel +; data-dir for storing persistable queues and level queues, individual queues will be named by their type +DATADIR = queues/ +; Default queue length before a channel queue will block +LENGTH = 20 +; Batch size to send for batched queues +BATCH_LENGTH = 20 +; Connection string for redis queues this will store the redis connection string. +CONN_STR = "addrs=127.0.0.1:6379 db=0" + [admin] ; Disallow regular (non-admin) users from creating organizations. DISABLE_REGULAR_ORG_CREATION = false diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index e71fb1b3bc..2db543f5e6 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -234,6 +234,14 @@ relation to port exhaustion. - `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed. - `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout. +## Queue (`queue`) + +- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `batched-channel`, `channel`, `level`, `redis`, `dummy` +- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. +- `LENGTH`: **20**: Maximal queue size before channel queues block +- `BATCH_LENGTH`: **20**: Batch data before passing to the handler +- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type. + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled diff --git a/modules/queue/queue.go b/modules/queue/queue.go new file mode 100644 index 0000000000..1220db5c03 --- /dev/null +++ b/modules/queue/queue.go @@ -0,0 +1,128 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" +) + +// ErrInvalidConfiguration is called when there is invalid configuration for a queue +type ErrInvalidConfiguration struct { + cfg interface{} + err error +} + +func (err ErrInvalidConfiguration) Error() string { + if err.err != nil { + return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err) + } + return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg) +} + +// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration +func IsErrInvalidConfiguration(err error) bool { + _, ok := err.(ErrInvalidConfiguration) + return ok +} + +// Type is a type of Queue +type Type string + +// Data defines an type of queuable data +type Data interface{} + +// HandlerFunc is a function that takes a variable amount of data and processes it +type HandlerFunc func(...Data) + +// NewQueueFunc is a function that creates a queue +type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error) + +// Shutdownable represents a queue that can be shutdown +type Shutdownable interface { + Shutdown() + Terminate() +} + +// Queue defines an interface to save an issue indexer queue +type Queue interface { + Run(atShutdown, atTerminate func(context.Context, func())) + Push(Data) error +} + +// DummyQueueType is the type for the dummy queue +const DummyQueueType Type = "dummy" + +// NewDummyQueue creates a new DummyQueue +func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) { + return &DummyQueue{}, nil +} + +// DummyQueue represents an empty queue +type DummyQueue struct { +} + +// Run starts to run the queue +func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} + +// Push pushes data to the queue +func (b *DummyQueue) Push(Data) error { + return nil +} + +func toConfig(exemplar, cfg interface{}) (interface{}, error) { + if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { + return cfg, nil + } + + configBytes, ok := cfg.([]byte) + if !ok { + configStr, ok := cfg.(string) + if !ok { + return nil, ErrInvalidConfiguration{cfg: cfg} + } + configBytes = []byte(configStr) + } + newVal := reflect.New(reflect.TypeOf(exemplar)) + if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { + return nil, ErrInvalidConfiguration{cfg: cfg, err: err} + } + return newVal.Elem().Interface(), nil +} + +var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} + +// RegisteredTypes provides the list of requested types of queues +func RegisteredTypes() []Type { + types := make([]Type, len(queuesMap)) + i := 0 + for key := range queuesMap { + types[i] = key + i++ + } + return types +} + +// RegisteredTypesAsString provides the list of requested types of queues +func RegisteredTypesAsString() []string { + types := make([]string, len(queuesMap)) + i := 0 + for key := range queuesMap { + types[i] = string(key) + i++ + } + return types +} + +// CreateQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error +func CreateQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { + newFn, ok := queuesMap[queueType] + if !ok { + return nil, fmt.Errorf("Unsupported queue type: %v", queueType) + } + return newFn(handlerFunc, opts, exemplar) +} diff --git a/modules/queue/queue_batch.go b/modules/queue/queue_batch.go new file mode 100644 index 0000000000..07166441e6 --- /dev/null +++ b/modules/queue/queue_batch.go @@ -0,0 +1,78 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// BatchedChannelQueueType is the type for batched channel queue +const BatchedChannelQueueType Type = "batched-channel" + +// BatchedChannelQueueConfiguration is the configuration for a BatchedChannelQueue +type BatchedChannelQueueConfiguration struct { + QueueLength int + BatchLength int +} + +// BatchedChannelQueue implements +type BatchedChannelQueue struct { + *ChannelQueue + batchLength int +} + +// NewBatchedChannelQueue create a memory channel queue +func NewBatchedChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(BatchedChannelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(BatchedChannelQueueConfiguration) + return &BatchedChannelQueue{ + &ChannelQueue{ + queue: make(chan Data, config.QueueLength), + handle: handle, + exemplar: exemplar, + }, + config.BatchLength, + }, nil +} + +// Run starts to run the queue +func (c *BatchedChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), func() { + log.Warn("BatchedChannelQueue is not shutdownable!") + }) + atTerminate(context.Background(), func() { + log.Warn("BatchedChannelQueue is not terminatable!") + }) + go func() { + delay := time.Millisecond * 300 + var datas = make([]Data, 0, c.batchLength) + for { + select { + case data := <-c.queue: + datas = append(datas, data) + if len(datas) >= c.batchLength { + c.handle(datas...) + datas = make([]Data, 0, c.batchLength) + } + case <-time.After(delay): + delay = time.Millisecond * 100 + if len(datas) > 0 { + c.handle(datas...) + datas = make([]Data, 0, c.batchLength) + } + } + } + }() +} + +func init() { + queuesMap[BatchedChannelQueueType] = NewBatchedChannelQueue +} diff --git a/modules/queue/queue_batch_test.go b/modules/queue/queue_batch_test.go new file mode 100644 index 0000000000..08d3641da1 --- /dev/null +++ b/modules/queue/queue_batch_test.go @@ -0,0 +1,46 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import "testing" + +import "github.com/stretchr/testify/assert" + +import "context" + +func TestBatchedChannelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + nilFn := func(_ context.Context, _ func()) {} + + queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2}, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go new file mode 100644 index 0000000000..e0cba2db01 --- /dev/null +++ b/modules/queue/queue_channel.go @@ -0,0 +1,75 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "reflect" + + "code.gitea.io/gitea/modules/log" +) + +// ChannelQueueType is the type for channel queue +const ChannelQueueType Type = "channel" + +// ChannelQueueConfiguration is the configuration for a ChannelQueue +type ChannelQueueConfiguration struct { + QueueLength int +} + +// ChannelQueue implements +type ChannelQueue struct { + queue chan Data + handle HandlerFunc + exemplar interface{} +} + +// NewChannelQueue create a memory channel queue +func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ChannelQueueConfiguration) + return &ChannelQueue{ + queue: make(chan Data, config.QueueLength), + handle: handle, + exemplar: exemplar, + }, nil +} + +// Run starts to run the queue +func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), func() { + log.Warn("ChannelQueue is not shutdownable!") + }) + atTerminate(context.Background(), func() { + log.Warn("ChannelQueue is not terminatable!") + }) + go func() { + for data := range c.queue { + c.handle(data) + } + }() +} + +// Push will push the indexer data to queue +func (c *ChannelQueue) Push(data Data) error { + if c.exemplar != nil { + // Assert data is of same type as r.exemplar + t := reflect.TypeOf(data) + exemplarType := reflect.TypeOf(c.exemplar) + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) + } + } + c.queue <- data + return nil +} + +func init() { + queuesMap[ChannelQueueType] = NewChannelQueue +} diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go new file mode 100644 index 0000000000..77f4a8fe8f --- /dev/null +++ b/modules/queue/queue_channel_test.go @@ -0,0 +1,38 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChannelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + nilFn := func(_ context.Context, _ func()) {} + + queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20}, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + go queue.Push(&test1) + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go new file mode 100644 index 0000000000..dafff5c21c --- /dev/null +++ b/modules/queue/queue_disk.go @@ -0,0 +1,158 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "time" + + "code.gitea.io/gitea/modules/log" + + "gitea.com/lunny/levelqueue" +) + +// LevelQueueType is the type for level queue +const LevelQueueType Type = "level" + +// LevelQueueConfiguration is the configuration for a LevelQueue +type LevelQueueConfiguration struct { + DataDir string + BatchLength int +} + +// LevelQueue implements a disk library queue +type LevelQueue struct { + handle HandlerFunc + queue *levelqueue.Queue + batchLength int + closed chan struct{} + exemplar interface{} +} + +// NewLevelQueue creates a ledis local queue +func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(LevelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(LevelQueueConfiguration) + + queue, err := levelqueue.Open(config.DataDir) + if err != nil { + return nil, err + } + + return &LevelQueue{ + handle: handle, + queue: queue, + batchLength: config.BatchLength, + exemplar: exemplar, + closed: make(chan struct{}), + }, nil +} + +// Run starts to run the queue +func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), l.Shutdown) + atTerminate(context.Background(), l.Terminate) + var i int + var datas = make([]Data, 0, l.batchLength) + for { + select { + case <-l.closed: + if len(datas) > 0 { + log.Trace("Handling: %d data, %v", len(datas), datas) + l.handle(datas...) + } + return + default: + } + i++ + if len(datas) > l.batchLength || (len(datas) > 0 && i > 3) { + log.Trace("Handling: %d data, %v", len(datas), datas) + l.handle(datas...) + datas = make([]Data, 0, l.batchLength) + i = 0 + continue + } + + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error("RPop: %v", err) + } + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data Data + if l.exemplar != nil { + t := reflect.TypeOf(l.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 10) + continue + } + + log.Trace("LevelQueue: task found: %#v", data) + + datas = append(datas, data) + } +} + +// Push will push the indexer data to queue +func (l *LevelQueue) Push(data Data) error { + if l.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(l.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) + } + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return l.queue.LPush(bs) +} + +// Shutdown this queue and stop processing +func (l *LevelQueue) Shutdown() { + select { + case <-l.closed: + default: + close(l.closed) + } +} + +// Terminate this queue and close the queue +func (l *LevelQueue) Terminate() { + l.Shutdown() + if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { + log.Error("Error whilst closing internal queue: %v", err) + } + +} + +func init() { + queuesMap[LevelQueueType] = NewLevelQueue +} diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go new file mode 100644 index 0000000000..b13f1b9603 --- /dev/null +++ b/modules/queue/queue_disk_channel.go @@ -0,0 +1,160 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "time" +) + +// PersistableChannelQueueType is the type for persistable queue +const PersistableChannelQueueType Type = "persistable-channel" + +// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue +type PersistableChannelQueueConfiguration struct { + DataDir string + BatchLength int + QueueLength int + Timeout time.Duration + MaxAttempts int +} + +// PersistableChannelQueue wraps a channel queue and level queue together +type PersistableChannelQueue struct { + *BatchedChannelQueue + delayedStarter + closed chan struct{} +} + +// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down +// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate +func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(PersistableChannelQueueConfiguration) + + batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + }, exemplar) + if err != nil { + return nil, err + } + + levelCfg := LevelQueueConfiguration{ + DataDir: config.DataDir, + BatchLength: config.BatchLength, + } + + levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + if err == nil { + return &PersistableChannelQueue{ + BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue), + delayedStarter: delayedStarter{ + internal: levelQueue.(*LevelQueue), + }, + closed: make(chan struct{}), + }, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + return &PersistableChannelQueue{ + BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue), + delayedStarter: delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + }, + closed: make(chan struct{}), + }, nil +} + +// Push will push the indexer data to queue +func (p *PersistableChannelQueue) Push(data Data) error { + select { + case <-p.closed: + return p.internal.Push(data) + default: + return p.BatchedChannelQueue.Push(data) + } +} + +// Run starts to run the queue +func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + p.lock.Lock() + if p.internal == nil { + p.setInternal(atShutdown, p.handle, p.exemplar) + } else { + p.lock.Unlock() + } + atShutdown(context.Background(), p.Shutdown) + atTerminate(context.Background(), p.Terminate) + + // Just run the level queue - we shut it down later + go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + delay := time.Millisecond * 300 + var datas = make([]Data, 0, p.batchLength) +loop: + for { + select { + case data := <-p.queue: + datas = append(datas, data) + if len(datas) >= p.batchLength { + p.handle(datas...) + datas = make([]Data, 0, p.batchLength) + } + case <-time.After(delay): + delay = time.Millisecond * 100 + if len(datas) > 0 { + p.handle(datas...) + datas = make([]Data, 0, p.batchLength) + } + case <-p.closed: + if len(datas) > 0 { + p.handle(datas...) + } + break loop + } + } + go func() { + for data := range p.queue { + _ = p.internal.Push(data) + } + }() +} + +// Shutdown processing this queue +func (p *PersistableChannelQueue) Shutdown() { + select { + case <-p.closed: + default: + close(p.closed) + p.lock.Lock() + defer p.lock.Unlock() + if p.internal != nil { + p.internal.(*LevelQueue).Shutdown() + } + } +} + +// Terminate this queue and close the queue +func (p *PersistableChannelQueue) Terminate() { + p.Shutdown() + p.lock.Lock() + defer p.lock.Unlock() + if p.internal != nil { + p.internal.(*LevelQueue).Terminate() + } +} + +func init() { + queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue +} diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go new file mode 100644 index 0000000000..66c90f3bc3 --- /dev/null +++ b/modules/queue/queue_disk_channel_test.go @@ -0,0 +1,105 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPersistableChannelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + var queueShutdown func() + var queueTerminate func() + + tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = shutdown + }, func(_ context.Context, terminate func()) { + queueTerminate = terminate + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + go func() { + err = queue.Push(&test2) + assert.NoError(t, err) + }() + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) + + queueShutdown() + time.Sleep(200 * time.Millisecond) + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + queueTerminate() + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = shutdown + }, func(_ context.Context, terminate func()) { + queueTerminate = terminate + }) + + result3 := <-handleChan + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + result4 := <-handleChan + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + queueShutdown() + queueTerminate() + +} diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go new file mode 100644 index 0000000000..9bc689b5f0 --- /dev/null +++ b/modules/queue/queue_disk_test.go @@ -0,0 +1,99 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLevelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + var queueShutdown func() + var queueTerminate func() + + queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ + DataDir: "level-queue-test-data", + BatchLength: 2, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = shutdown + }, func(_ context.Context, terminate func()) { + queueTerminate = terminate + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + go func() { + err = queue.Push(&test2) + assert.NoError(t, err) + }() + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) + + queueShutdown() + time.Sleep(200 * time.Millisecond) + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + queueTerminate() + + // Reopen queue + queue, err = NewLevelQueue(handle, LevelQueueConfiguration{ + DataDir: "level-queue-test-data", + BatchLength: 2, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = shutdown + }, func(_ context.Context, terminate func()) { + queueTerminate = terminate + }) + + result3 := <-handleChan + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + result4 := <-handleChan + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + queueShutdown() + queueTerminate() + + os.RemoveAll("level-queue-test-data") +} diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go new file mode 100644 index 0000000000..b785f0073f --- /dev/null +++ b/modules/queue/queue_redis.go @@ -0,0 +1,190 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/go-redis/redis" +) + +// RedisQueueType is the type for redis queue +const RedisQueueType Type = "redis" + +type redisClient interface { + RPush(key string, args ...interface{}) *redis.IntCmd + LPop(key string) *redis.StringCmd + Ping() *redis.StatusCmd + Close() error +} + +// RedisQueue redis queue +type RedisQueue struct { + client redisClient + queueName string + handle HandlerFunc + batchLength int + closed chan struct{} + exemplar interface{} +} + +// RedisQueueConfiguration is the configuration for the redis queue +type RedisQueueConfiguration struct { + Addresses string + Password string + DBIndex int + BatchLength int + QueueName string +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisQueueConfiguration) + + dbs := strings.Split(config.Addresses, ",") + var queue = RedisQueue{ + queueName: config.QueueName, + handle: handle, + batchLength: config.BatchLength, + exemplar: exemplar, + closed: make(chan struct{}), + } + if len(dbs) == 0 { + return nil, errors.New("no redis host found") + } else if len(dbs) == 1 { + queue.client = redis.NewClient(&redis.Options{ + Addr: strings.TrimSpace(dbs[0]), // use default Addr + Password: config.Password, // no password set + DB: config.DBIndex, // use default DB + }) + } else { + queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: dbs, + }) + } + if err := queue.client.Ping().Err(); err != nil { + return nil, err + } + return &queue, nil +} + +// Run runs the redis queue +func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), r.Shutdown) + atTerminate(context.Background(), r.Terminate) + var i int + var datas = make([]Data, 0, r.batchLength) + for { + select { + case <-r.closed: + if len(datas) > 0 { + log.Trace("Handling: %d data, %v", len(datas), datas) + r.handle(datas...) + } + return + default: + } + bs, err := r.client.LPop(r.queueName).Bytes() + if err != nil && err != redis.Nil { + log.Error("LPop failed: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > r.batchLength || (len(datas) > 0 && i > 3) { + log.Trace("Handling: %d data, %v", len(datas), datas) + r.handle(datas...) + datas = make([]Data, 0, r.batchLength) + i = 0 + } + + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data Data + if r.exemplar != nil { + t := reflect.TypeOf(r.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("RedisQueue: task found: %#v", data) + + datas = append(datas, data) + select { + case <-r.closed: + if len(datas) > 0 { + log.Trace("Handling: %d data, %v", len(datas), datas) + r.handle(datas...) + } + return + default: + } + time.Sleep(time.Millisecond * 100) + } +} + +// Push implements Queue +func (r *RedisQueue) Push(data Data) error { + if r.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(r.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) + } + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return r.client.RPush(r.queueName, bs).Err() +} + +// Shutdown processing from this queue +func (r *RedisQueue) Shutdown() { + select { + case <-r.closed: + default: + close(r.closed) + } +} + +// Terminate this queue and close the queue +func (r *RedisQueue) Terminate() { + r.Shutdown() + if err := r.client.Close(); err != nil { + log.Error("Error whilst closing internal redis client: %v", err) + } +} + +func init() { + queuesMap[RedisQueueType] = NewRedisQueue +} diff --git a/modules/queue/queue_test.go b/modules/queue/queue_test.go new file mode 100644 index 0000000000..e41643da21 --- /dev/null +++ b/modules/queue/queue_test.go @@ -0,0 +1,42 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import "testing" + +import "github.com/stretchr/testify/assert" + +import "encoding/json" + +type testData struct { + TestString string + TestInt int +} + +func TestToConfig(t *testing.T) { + cfg := testData{ + TestString: "Config", + TestInt: 10, + } + exemplar := testData{} + + cfg2I, err := toConfig(exemplar, cfg) + assert.NoError(t, err) + cfg2, ok := (cfg2I).(testData) + assert.True(t, ok) + assert.NotEqual(t, cfg2, exemplar) + assert.Equal(t, &cfg, &cfg2) + + cfgString, err := json.Marshal(cfg) + assert.NoError(t, err) + + cfg3I, err := toConfig(exemplar, cfgString) + assert.NoError(t, err) + cfg3, ok := (cfg3I).(testData) + assert.True(t, ok) + assert.Equal(t, cfg.TestString, cfg3.TestString) + assert.Equal(t, cfg.TestInt, cfg3.TestInt) + assert.NotEqual(t, cfg3, exemplar) +} diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go new file mode 100644 index 0000000000..f99675a9f9 --- /dev/null +++ b/modules/queue/queue_wrapped.go @@ -0,0 +1,183 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// WrappedQueueType is the type for a wrapped delayed starting queue +const WrappedQueueType Type = "wrapped" + +// WrappedQueueConfiguration is the configuration for a WrappedQueue +type WrappedQueueConfiguration struct { + Underlying Type + Timeout time.Duration + MaxAttempts int + Config interface{} + QueueLength int +} + +type delayedStarter struct { + lock sync.Mutex + internal Queue + underlying Type + cfg interface{} + timeout time.Duration + maxAttempts int +} + +func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) { + var ctx context.Context + var cancel context.CancelFunc + if q.timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), q.timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + + defer cancel() + // Ensure we also stop at shutdown + atShutdown(ctx, func() { + cancel() + }) + + i := 1 + for q.internal == nil { + select { + case <-ctx.Done(): + q.lock.Unlock() + log.Fatal("Timedout creating queue %v with cfg %v ", q.underlying, q.cfg) + default: + queue, err := CreateQueue(q.underlying, handle, q.cfg, exemplar) + if err == nil { + q.internal = queue + q.lock.Unlock() + break + } + if err.Error() != "resource temporarily unavailable" { + log.Warn("[Attempt: %d] Failed to create queue: %v cfg: %v error: %v", i, q.underlying, q.cfg, err) + } + i++ + if q.maxAttempts > 0 && i > q.maxAttempts { + q.lock.Unlock() + log.Fatal("Unable to create queue %v with cfg %v by max attempts: error: %v", q.underlying, q.cfg, err) + } + sleepTime := 100 * time.Millisecond + if q.timeout > 0 && q.maxAttempts > 0 { + sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts) + } + time.Sleep(sleepTime) + } + } +} + +// WrappedQueue wraps a delayed starting queue +type WrappedQueue struct { + delayedStarter + handle HandlerFunc + exemplar interface{} + channel chan Data +} + +// NewWrappedQueue will attempt to create a queue of the provided type, +// but if there is a problem creating this queue it will instead create +// a WrappedQueue with delayed the startup of the queue instead and a +// channel which will be redirected to the queue +func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(WrappedQueueConfiguration) + + queue, err := CreateQueue(config.Underlying, handle, config.Config, exemplar) + if err == nil { + // Just return the queue there is no need to wrap + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + return &WrappedQueue{ + handle: handle, + channel: make(chan Data, config.QueueLength), + exemplar: exemplar, + delayedStarter: delayedStarter{ + cfg: config.Config, + underlying: config.Underlying, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + }, + }, nil +} + +// Push will push the data to the internal channel checking it against the exemplar +func (q *WrappedQueue) Push(data Data) error { + if q.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(q.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + } + q.channel <- data + return nil +} + +// Run starts to run the queue and attempts to create the internal queue +func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + q.lock.Lock() + if q.internal == nil { + q.setInternal(atShutdown, q.handle, q.exemplar) + go func() { + for data := range q.channel { + _ = q.internal.Push(data) + } + }() + } else { + q.lock.Unlock() + } + + q.internal.Run(atShutdown, atTerminate) +} + +// Shutdown this queue and stop processing +func (q *WrappedQueue) Shutdown() { + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + if shutdownable, ok := q.internal.(Shutdownable); ok { + shutdownable.Shutdown() + } +} + +// Terminate this queue and close the queue +func (q *WrappedQueue) Terminate() { + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + if shutdownable, ok := q.internal.(Shutdownable); ok { + shutdownable.Terminate() + } +} + +func init() { + queuesMap[WrappedQueueType] = NewWrappedQueue +} diff --git a/modules/setting/queue.go b/modules/setting/queue.go new file mode 100644 index 0000000000..4c80c79079 --- /dev/null +++ b/modules/setting/queue.go @@ -0,0 +1,143 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package setting + +import ( + "encoding/json" + "path" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" +) + +type queueSettings struct { + DataDir string + Length int + BatchLength int + ConnectionString string + Type string + Addresses string + Password string + DBIndex int + WrapIfNecessary bool + MaxAttempts int + Timeout time.Duration + Workers int +} + +// Queue settings +var Queue = queueSettings{} + +// CreateQueue for name with provided handler and exemplar +func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) queue.Queue { + q := getQueueSettings(name) + opts := make(map[string]interface{}) + opts["QueueLength"] = q.Length + opts["BatchLength"] = q.BatchLength + opts["DataDir"] = q.DataDir + opts["Addresses"] = q.Addresses + opts["Password"] = q.Password + opts["DBIndex"] = q.DBIndex + opts["QueueName"] = name + + cfg, err := json.Marshal(opts) + if err != nil { + log.Error("Unable to marshall generic options: %v Error: %v", opts, err) + log.Error("Unable to create queue for %s", name, err) + return nil + } + + returnable, err := queue.CreateQueue(queue.Type(q.Type), handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = queue.CreateQueue(queue.WrappedQueueType, handle, queue.WrappedQueueConfiguration{ + Underlying: queue.Type(q.Type), + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create queue for %s: %v", name, err) + return nil + } + return returnable +} + +func getQueueSettings(name string) queueSettings { + q := queueSettings{} + sec := Cfg.Section("queue." + name) + // DataDir is not directly inheritable + q.DataDir = path.Join(Queue.DataDir, name) + for _, key := range sec.Keys() { + switch key.Name() { + case "DATADIR": + q.DataDir = key.MustString(q.DataDir) + } + } + if !path.IsAbs(q.DataDir) { + q.DataDir = path.Join(AppDataPath, q.DataDir) + } + sec.Key("DATADIR").SetValue(q.DataDir) + // The rest are... + q.Length = sec.Key("LENGTH").MustInt(Queue.Length) + q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) + q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) + validTypes := queue.RegisteredTypesAsString() + q.Type = sec.Key("TYPE").In(Queue.Type, validTypes) + q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) + q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) + q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) + q.Workers = sec.Key("WORKER").MustInt(Queue.Workers) + + q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) + return q +} + +func newQueueService() { + sec := Cfg.Section("queue") + Queue.DataDir = sec.Key("DATADIR").MustString("queues/") + if !path.IsAbs(Queue.DataDir) { + Queue.DataDir = path.Join(AppDataPath, Queue.DataDir) + } + Queue.Length = sec.Key("LENGTH").MustInt(20) + Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) + Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) + validTypes := queue.RegisteredTypesAsString() + Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes) + Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) + Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) + Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) + Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) + Queue.Workers = sec.Key("WORKER").MustInt(1) +} + +// ParseQueueConnStr parses a queue connection string +func ParseQueueConnStr(connStr string) (addrs, password string, dbIdx int, err error) { + fields := strings.Fields(connStr) + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "addrs": + addrs = items[1] + case "password": + password = items[1] + case "db": + dbIdx, err = strconv.Atoi(items[1]) + if err != nil { + return + } + } + } + return +}