mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-01-15 11:35:31 +00:00
Queue: Add generic graceful queues with settings
This commit is contained in:
parent
b983ff4473
commit
85042634fc
15 changed files with 1466 additions and 0 deletions
|
@ -371,6 +371,19 @@ REPO_INDEXER_INCLUDE =
|
||||||
; A comma separated list of glob patterns to exclude from the index; ; default is empty
|
; A comma separated list of glob patterns to exclude from the index; ; default is empty
|
||||||
REPO_INDEXER_EXCLUDE =
|
REPO_INDEXER_EXCLUDE =
|
||||||
|
|
||||||
|
[queue]
|
||||||
|
; General queue queue type, currently support: persistable-channel, channel, level, redis, dummy
|
||||||
|
; default to persistable-channel
|
||||||
|
TYPE = persistable-channel
|
||||||
|
; data-dir for storing persistable queues and level queues, individual queues will be named by their type
|
||||||
|
DATADIR = queues/
|
||||||
|
; Default queue length before a channel queue will block
|
||||||
|
LENGTH = 20
|
||||||
|
; Batch size to send for batched queues
|
||||||
|
BATCH_LENGTH = 20
|
||||||
|
; Connection string for redis queues this will store the redis connection string.
|
||||||
|
CONN_STR = "addrs=127.0.0.1:6379 db=0"
|
||||||
|
|
||||||
[admin]
|
[admin]
|
||||||
; Disallow regular (non-admin) users from creating organizations.
|
; Disallow regular (non-admin) users from creating organizations.
|
||||||
DISABLE_REGULAR_ORG_CREATION = false
|
DISABLE_REGULAR_ORG_CREATION = false
|
||||||
|
|
|
@ -234,6 +234,14 @@ relation to port exhaustion.
|
||||||
- `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed.
|
- `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed.
|
||||||
- `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout.
|
- `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout.
|
||||||
|
|
||||||
|
## Queue (`queue`)
|
||||||
|
|
||||||
|
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `batched-channel`, `channel`, `level`, `redis`, `dummy`
|
||||||
|
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues.
|
||||||
|
- `LENGTH`: **20**: Maximal queue size before channel queues block
|
||||||
|
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
|
||||||
|
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
|
||||||
|
|
||||||
## Admin (`admin`)
|
## Admin (`admin`)
|
||||||
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
|
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
|
||||||
|
|
||||||
|
|
128
modules/queue/queue.go
Normal file
128
modules/queue/queue.go
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrInvalidConfiguration is called when there is invalid configuration for a queue
|
||||||
|
type ErrInvalidConfiguration struct {
|
||||||
|
cfg interface{}
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err ErrInvalidConfiguration) Error() string {
|
||||||
|
if err.err != nil {
|
||||||
|
return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
|
||||||
|
func IsErrInvalidConfiguration(err error) bool {
|
||||||
|
_, ok := err.(ErrInvalidConfiguration)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type is a type of Queue
|
||||||
|
type Type string
|
||||||
|
|
||||||
|
// Data defines an type of queuable data
|
||||||
|
type Data interface{}
|
||||||
|
|
||||||
|
// HandlerFunc is a function that takes a variable amount of data and processes it
|
||||||
|
type HandlerFunc func(...Data)
|
||||||
|
|
||||||
|
// NewQueueFunc is a function that creates a queue
|
||||||
|
type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
|
||||||
|
|
||||||
|
// Shutdownable represents a queue that can be shutdown
|
||||||
|
type Shutdownable interface {
|
||||||
|
Shutdown()
|
||||||
|
Terminate()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue defines an interface to save an issue indexer queue
|
||||||
|
type Queue interface {
|
||||||
|
Run(atShutdown, atTerminate func(context.Context, func()))
|
||||||
|
Push(Data) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// DummyQueueType is the type for the dummy queue
|
||||||
|
const DummyQueueType Type = "dummy"
|
||||||
|
|
||||||
|
// NewDummyQueue creates a new DummyQueue
|
||||||
|
func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
|
||||||
|
return &DummyQueue{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DummyQueue represents an empty queue
|
||||||
|
type DummyQueue struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue
|
||||||
|
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
|
||||||
|
|
||||||
|
// Push pushes data to the queue
|
||||||
|
func (b *DummyQueue) Push(Data) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toConfig(exemplar, cfg interface{}) (interface{}, error) {
|
||||||
|
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
configBytes, ok := cfg.([]byte)
|
||||||
|
if !ok {
|
||||||
|
configStr, ok := cfg.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||||
|
}
|
||||||
|
configBytes = []byte(configStr)
|
||||||
|
}
|
||||||
|
newVal := reflect.New(reflect.TypeOf(exemplar))
|
||||||
|
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
|
||||||
|
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
|
||||||
|
}
|
||||||
|
return newVal.Elem().Interface(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
|
||||||
|
|
||||||
|
// RegisteredTypes provides the list of requested types of queues
|
||||||
|
func RegisteredTypes() []Type {
|
||||||
|
types := make([]Type, len(queuesMap))
|
||||||
|
i := 0
|
||||||
|
for key := range queuesMap {
|
||||||
|
types[i] = key
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
return types
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisteredTypesAsString provides the list of requested types of queues
|
||||||
|
func RegisteredTypesAsString() []string {
|
||||||
|
types := make([]string, len(queuesMap))
|
||||||
|
i := 0
|
||||||
|
for key := range queuesMap {
|
||||||
|
types[i] = string(key)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
return types
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
|
||||||
|
func CreateQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
|
||||||
|
newFn, ok := queuesMap[queueType]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
|
||||||
|
}
|
||||||
|
return newFn(handlerFunc, opts, exemplar)
|
||||||
|
}
|
78
modules/queue/queue_batch.go
Normal file
78
modules/queue/queue_batch.go
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BatchedChannelQueueType is the type for batched channel queue
|
||||||
|
const BatchedChannelQueueType Type = "batched-channel"
|
||||||
|
|
||||||
|
// BatchedChannelQueueConfiguration is the configuration for a BatchedChannelQueue
|
||||||
|
type BatchedChannelQueueConfiguration struct {
|
||||||
|
QueueLength int
|
||||||
|
BatchLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchedChannelQueue implements
|
||||||
|
type BatchedChannelQueue struct {
|
||||||
|
*ChannelQueue
|
||||||
|
batchLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatchedChannelQueue create a memory channel queue
|
||||||
|
func NewBatchedChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(BatchedChannelQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(BatchedChannelQueueConfiguration)
|
||||||
|
return &BatchedChannelQueue{
|
||||||
|
&ChannelQueue{
|
||||||
|
queue: make(chan Data, config.QueueLength),
|
||||||
|
handle: handle,
|
||||||
|
exemplar: exemplar,
|
||||||
|
},
|
||||||
|
config.BatchLength,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue
|
||||||
|
func (c *BatchedChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
atShutdown(context.Background(), func() {
|
||||||
|
log.Warn("BatchedChannelQueue is not shutdownable!")
|
||||||
|
})
|
||||||
|
atTerminate(context.Background(), func() {
|
||||||
|
log.Warn("BatchedChannelQueue is not terminatable!")
|
||||||
|
})
|
||||||
|
go func() {
|
||||||
|
delay := time.Millisecond * 300
|
||||||
|
var datas = make([]Data, 0, c.batchLength)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case data := <-c.queue:
|
||||||
|
datas = append(datas, data)
|
||||||
|
if len(datas) >= c.batchLength {
|
||||||
|
c.handle(datas...)
|
||||||
|
datas = make([]Data, 0, c.batchLength)
|
||||||
|
}
|
||||||
|
case <-time.After(delay):
|
||||||
|
delay = time.Millisecond * 100
|
||||||
|
if len(datas) > 0 {
|
||||||
|
c.handle(datas...)
|
||||||
|
datas = make([]Data, 0, c.batchLength)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[BatchedChannelQueueType] = NewBatchedChannelQueue
|
||||||
|
}
|
46
modules/queue/queue_batch_test.go
Normal file
46
modules/queue/queue_batch_test.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
import "github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
func TestBatchedChannelQueue(t *testing.T) {
|
||||||
|
handleChan := make(chan *testData)
|
||||||
|
handle := func(data ...Data) {
|
||||||
|
assert.True(t, len(data) == 2)
|
||||||
|
for _, datum := range data {
|
||||||
|
testDatum := datum.(*testData)
|
||||||
|
handleChan <- testDatum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nilFn := func(_ context.Context, _ func()) {}
|
||||||
|
|
||||||
|
queue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{QueueLength: 20, BatchLength: 2}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(nilFn, nilFn)
|
||||||
|
|
||||||
|
test1 := testData{"A", 1}
|
||||||
|
test2 := testData{"B", 2}
|
||||||
|
|
||||||
|
queue.Push(&test1)
|
||||||
|
go queue.Push(&test2)
|
||||||
|
|
||||||
|
result1 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result1.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result1.TestInt)
|
||||||
|
|
||||||
|
result2 := <-handleChan
|
||||||
|
assert.Equal(t, test2.TestString, result2.TestString)
|
||||||
|
assert.Equal(t, test2.TestInt, result2.TestInt)
|
||||||
|
|
||||||
|
err = queue.Push(test1)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
75
modules/queue/queue_channel.go
Normal file
75
modules/queue/queue_channel.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChannelQueueType is the type for channel queue
|
||||||
|
const ChannelQueueType Type = "channel"
|
||||||
|
|
||||||
|
// ChannelQueueConfiguration is the configuration for a ChannelQueue
|
||||||
|
type ChannelQueueConfiguration struct {
|
||||||
|
QueueLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChannelQueue implements
|
||||||
|
type ChannelQueue struct {
|
||||||
|
queue chan Data
|
||||||
|
handle HandlerFunc
|
||||||
|
exemplar interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChannelQueue create a memory channel queue
|
||||||
|
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(ChannelQueueConfiguration)
|
||||||
|
return &ChannelQueue{
|
||||||
|
queue: make(chan Data, config.QueueLength),
|
||||||
|
handle: handle,
|
||||||
|
exemplar: exemplar,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue
|
||||||
|
func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
atShutdown(context.Background(), func() {
|
||||||
|
log.Warn("ChannelQueue is not shutdownable!")
|
||||||
|
})
|
||||||
|
atTerminate(context.Background(), func() {
|
||||||
|
log.Warn("ChannelQueue is not terminatable!")
|
||||||
|
})
|
||||||
|
go func() {
|
||||||
|
for data := range c.queue {
|
||||||
|
c.handle(data)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push will push the indexer data to queue
|
||||||
|
func (c *ChannelQueue) Push(data Data) error {
|
||||||
|
if c.exemplar != nil {
|
||||||
|
// Assert data is of same type as r.exemplar
|
||||||
|
t := reflect.TypeOf(data)
|
||||||
|
exemplarType := reflect.TypeOf(c.exemplar)
|
||||||
|
if !t.AssignableTo(exemplarType) || data == nil {
|
||||||
|
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.queue <- data
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[ChannelQueueType] = NewChannelQueue
|
||||||
|
}
|
38
modules/queue/queue_channel_test.go
Normal file
38
modules/queue/queue_channel_test.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestChannelQueue(t *testing.T) {
|
||||||
|
handleChan := make(chan *testData)
|
||||||
|
handle := func(data ...Data) {
|
||||||
|
for _, datum := range data {
|
||||||
|
testDatum := datum.(*testData)
|
||||||
|
handleChan <- testDatum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nilFn := func(_ context.Context, _ func()) {}
|
||||||
|
|
||||||
|
queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{QueueLength: 20}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(nilFn, nilFn)
|
||||||
|
|
||||||
|
test1 := testData{"A", 1}
|
||||||
|
go queue.Push(&test1)
|
||||||
|
result1 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result1.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result1.TestInt)
|
||||||
|
|
||||||
|
err = queue.Push(test1)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
158
modules/queue/queue_disk.go
Normal file
158
modules/queue/queue_disk.go
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
|
||||||
|
"gitea.com/lunny/levelqueue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LevelQueueType is the type for level queue
|
||||||
|
const LevelQueueType Type = "level"
|
||||||
|
|
||||||
|
// LevelQueueConfiguration is the configuration for a LevelQueue
|
||||||
|
type LevelQueueConfiguration struct {
|
||||||
|
DataDir string
|
||||||
|
BatchLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// LevelQueue implements a disk library queue
|
||||||
|
type LevelQueue struct {
|
||||||
|
handle HandlerFunc
|
||||||
|
queue *levelqueue.Queue
|
||||||
|
batchLength int
|
||||||
|
closed chan struct{}
|
||||||
|
exemplar interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLevelQueue creates a ledis local queue
|
||||||
|
func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(LevelQueueConfiguration)
|
||||||
|
|
||||||
|
queue, err := levelqueue.Open(config.DataDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &LevelQueue{
|
||||||
|
handle: handle,
|
||||||
|
queue: queue,
|
||||||
|
batchLength: config.BatchLength,
|
||||||
|
exemplar: exemplar,
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue
|
||||||
|
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
atShutdown(context.Background(), l.Shutdown)
|
||||||
|
atTerminate(context.Background(), l.Terminate)
|
||||||
|
var i int
|
||||||
|
var datas = make([]Data, 0, l.batchLength)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-l.closed:
|
||||||
|
if len(datas) > 0 {
|
||||||
|
log.Trace("Handling: %d data, %v", len(datas), datas)
|
||||||
|
l.handle(datas...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if len(datas) > l.batchLength || (len(datas) > 0 && i > 3) {
|
||||||
|
log.Trace("Handling: %d data, %v", len(datas), datas)
|
||||||
|
l.handle(datas...)
|
||||||
|
datas = make([]Data, 0, l.batchLength)
|
||||||
|
i = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err := l.queue.RPop()
|
||||||
|
if err != nil {
|
||||||
|
if err != levelqueue.ErrNotFound {
|
||||||
|
log.Error("RPop: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(bs) == 0 {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var data Data
|
||||||
|
if l.exemplar != nil {
|
||||||
|
t := reflect.TypeOf(l.exemplar)
|
||||||
|
n := reflect.New(t)
|
||||||
|
ne := n.Elem()
|
||||||
|
err = json.Unmarshal(bs, ne.Addr().Interface())
|
||||||
|
data = ne.Interface().(Data)
|
||||||
|
} else {
|
||||||
|
err = json.Unmarshal(bs, &data)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unmarshal: %v", err)
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace("LevelQueue: task found: %#v", data)
|
||||||
|
|
||||||
|
datas = append(datas, data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push will push the indexer data to queue
|
||||||
|
func (l *LevelQueue) Push(data Data) error {
|
||||||
|
if l.exemplar != nil {
|
||||||
|
// Assert data is of same type as r.exemplar
|
||||||
|
value := reflect.ValueOf(data)
|
||||||
|
t := value.Type()
|
||||||
|
exemplarType := reflect.ValueOf(l.exemplar).Type()
|
||||||
|
if !t.AssignableTo(exemplarType) || data == nil {
|
||||||
|
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.queue.LPush(bs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown this queue and stop processing
|
||||||
|
func (l *LevelQueue) Shutdown() {
|
||||||
|
select {
|
||||||
|
case <-l.closed:
|
||||||
|
default:
|
||||||
|
close(l.closed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate this queue and close the queue
|
||||||
|
func (l *LevelQueue) Terminate() {
|
||||||
|
l.Shutdown()
|
||||||
|
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
|
||||||
|
log.Error("Error whilst closing internal queue: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[LevelQueueType] = NewLevelQueue
|
||||||
|
}
|
160
modules/queue/queue_disk_channel.go
Normal file
160
modules/queue/queue_disk_channel.go
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PersistableChannelQueueType is the type for persistable queue
|
||||||
|
const PersistableChannelQueueType Type = "persistable-channel"
|
||||||
|
|
||||||
|
// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
|
||||||
|
type PersistableChannelQueueConfiguration struct {
|
||||||
|
DataDir string
|
||||||
|
BatchLength int
|
||||||
|
QueueLength int
|
||||||
|
Timeout time.Duration
|
||||||
|
MaxAttempts int
|
||||||
|
}
|
||||||
|
|
||||||
|
// PersistableChannelQueue wraps a channel queue and level queue together
|
||||||
|
type PersistableChannelQueue struct {
|
||||||
|
*BatchedChannelQueue
|
||||||
|
delayedStarter
|
||||||
|
closed chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
|
||||||
|
// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
|
||||||
|
func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(PersistableChannelQueueConfiguration)
|
||||||
|
|
||||||
|
batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{
|
||||||
|
QueueLength: config.QueueLength,
|
||||||
|
BatchLength: config.BatchLength,
|
||||||
|
}, exemplar)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
levelCfg := LevelQueueConfiguration{
|
||||||
|
DataDir: config.DataDir,
|
||||||
|
BatchLength: config.BatchLength,
|
||||||
|
}
|
||||||
|
|
||||||
|
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
|
||||||
|
if err == nil {
|
||||||
|
return &PersistableChannelQueue{
|
||||||
|
BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue),
|
||||||
|
delayedStarter: delayedStarter{
|
||||||
|
internal: levelQueue.(*LevelQueue),
|
||||||
|
},
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
if IsErrInvalidConfiguration(err) {
|
||||||
|
// Retrying ain't gonna make this any better...
|
||||||
|
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PersistableChannelQueue{
|
||||||
|
BatchedChannelQueue: batchChannelQueue.(*BatchedChannelQueue),
|
||||||
|
delayedStarter: delayedStarter{
|
||||||
|
cfg: levelCfg,
|
||||||
|
underlying: LevelQueueType,
|
||||||
|
timeout: config.Timeout,
|
||||||
|
maxAttempts: config.MaxAttempts,
|
||||||
|
},
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push will push the indexer data to queue
|
||||||
|
func (p *PersistableChannelQueue) Push(data Data) error {
|
||||||
|
select {
|
||||||
|
case <-p.closed:
|
||||||
|
return p.internal.Push(data)
|
||||||
|
default:
|
||||||
|
return p.BatchedChannelQueue.Push(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue
|
||||||
|
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
p.lock.Lock()
|
||||||
|
if p.internal == nil {
|
||||||
|
p.setInternal(atShutdown, p.handle, p.exemplar)
|
||||||
|
} else {
|
||||||
|
p.lock.Unlock()
|
||||||
|
}
|
||||||
|
atShutdown(context.Background(), p.Shutdown)
|
||||||
|
atTerminate(context.Background(), p.Terminate)
|
||||||
|
|
||||||
|
// Just run the level queue - we shut it down later
|
||||||
|
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
|
||||||
|
delay := time.Millisecond * 300
|
||||||
|
var datas = make([]Data, 0, p.batchLength)
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case data := <-p.queue:
|
||||||
|
datas = append(datas, data)
|
||||||
|
if len(datas) >= p.batchLength {
|
||||||
|
p.handle(datas...)
|
||||||
|
datas = make([]Data, 0, p.batchLength)
|
||||||
|
}
|
||||||
|
case <-time.After(delay):
|
||||||
|
delay = time.Millisecond * 100
|
||||||
|
if len(datas) > 0 {
|
||||||
|
p.handle(datas...)
|
||||||
|
datas = make([]Data, 0, p.batchLength)
|
||||||
|
}
|
||||||
|
case <-p.closed:
|
||||||
|
if len(datas) > 0 {
|
||||||
|
p.handle(datas...)
|
||||||
|
}
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for data := range p.queue {
|
||||||
|
_ = p.internal.Push(data)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown processing this queue
|
||||||
|
func (p *PersistableChannelQueue) Shutdown() {
|
||||||
|
select {
|
||||||
|
case <-p.closed:
|
||||||
|
default:
|
||||||
|
close(p.closed)
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
if p.internal != nil {
|
||||||
|
p.internal.(*LevelQueue).Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate this queue and close the queue
|
||||||
|
func (p *PersistableChannelQueue) Terminate() {
|
||||||
|
p.Shutdown()
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
if p.internal != nil {
|
||||||
|
p.internal.(*LevelQueue).Terminate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
|
||||||
|
}
|
105
modules/queue/queue_disk_channel_test.go
Normal file
105
modules/queue/queue_disk_channel_test.go
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPersistableChannelQueue(t *testing.T) {
|
||||||
|
handleChan := make(chan *testData)
|
||||||
|
handle := func(data ...Data) {
|
||||||
|
assert.True(t, len(data) == 2)
|
||||||
|
for _, datum := range data {
|
||||||
|
testDatum := datum.(*testData)
|
||||||
|
handleChan <- testDatum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var queueShutdown func()
|
||||||
|
var queueTerminate func()
|
||||||
|
|
||||||
|
tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
|
||||||
|
DataDir: tmpDir,
|
||||||
|
BatchLength: 2,
|
||||||
|
QueueLength: 20,
|
||||||
|
}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(func(_ context.Context, shutdown func()) {
|
||||||
|
queueShutdown = shutdown
|
||||||
|
}, func(_ context.Context, terminate func()) {
|
||||||
|
queueTerminate = terminate
|
||||||
|
})
|
||||||
|
|
||||||
|
test1 := testData{"A", 1}
|
||||||
|
test2 := testData{"B", 2}
|
||||||
|
|
||||||
|
err = queue.Push(&test1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
go func() {
|
||||||
|
err = queue.Push(&test2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
result1 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result1.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result1.TestInt)
|
||||||
|
|
||||||
|
result2 := <-handleChan
|
||||||
|
assert.Equal(t, test2.TestString, result2.TestString)
|
||||||
|
assert.Equal(t, test2.TestInt, result2.TestInt)
|
||||||
|
|
||||||
|
err = queue.Push(test1)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
queueShutdown()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
err = queue.Push(&test1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = queue.Push(&test2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
select {
|
||||||
|
case <-handleChan:
|
||||||
|
assert.Fail(t, "Handler processing should have stopped")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
queueTerminate()
|
||||||
|
|
||||||
|
// Reopen queue
|
||||||
|
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
|
||||||
|
DataDir: tmpDir,
|
||||||
|
BatchLength: 2,
|
||||||
|
QueueLength: 20,
|
||||||
|
}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(func(_ context.Context, shutdown func()) {
|
||||||
|
queueShutdown = shutdown
|
||||||
|
}, func(_ context.Context, terminate func()) {
|
||||||
|
queueTerminate = terminate
|
||||||
|
})
|
||||||
|
|
||||||
|
result3 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result3.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result3.TestInt)
|
||||||
|
|
||||||
|
result4 := <-handleChan
|
||||||
|
assert.Equal(t, test2.TestString, result4.TestString)
|
||||||
|
assert.Equal(t, test2.TestInt, result4.TestInt)
|
||||||
|
queueShutdown()
|
||||||
|
queueTerminate()
|
||||||
|
|
||||||
|
}
|
99
modules/queue/queue_disk_test.go
Normal file
99
modules/queue/queue_disk_test.go
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLevelQueue(t *testing.T) {
|
||||||
|
handleChan := make(chan *testData)
|
||||||
|
handle := func(data ...Data) {
|
||||||
|
assert.True(t, len(data) == 2)
|
||||||
|
for _, datum := range data {
|
||||||
|
testDatum := datum.(*testData)
|
||||||
|
handleChan <- testDatum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var queueShutdown func()
|
||||||
|
var queueTerminate func()
|
||||||
|
|
||||||
|
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
|
||||||
|
DataDir: "level-queue-test-data",
|
||||||
|
BatchLength: 2,
|
||||||
|
}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(func(_ context.Context, shutdown func()) {
|
||||||
|
queueShutdown = shutdown
|
||||||
|
}, func(_ context.Context, terminate func()) {
|
||||||
|
queueTerminate = terminate
|
||||||
|
})
|
||||||
|
|
||||||
|
test1 := testData{"A", 1}
|
||||||
|
test2 := testData{"B", 2}
|
||||||
|
|
||||||
|
err = queue.Push(&test1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
go func() {
|
||||||
|
err = queue.Push(&test2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
result1 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result1.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result1.TestInt)
|
||||||
|
|
||||||
|
result2 := <-handleChan
|
||||||
|
assert.Equal(t, test2.TestString, result2.TestString)
|
||||||
|
assert.Equal(t, test2.TestInt, result2.TestInt)
|
||||||
|
|
||||||
|
err = queue.Push(test1)
|
||||||
|
assert.Error(t, err)
|
||||||
|
|
||||||
|
queueShutdown()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
err = queue.Push(&test1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = queue.Push(&test2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
select {
|
||||||
|
case <-handleChan:
|
||||||
|
assert.Fail(t, "Handler processing should have stopped")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
queueTerminate()
|
||||||
|
|
||||||
|
// Reopen queue
|
||||||
|
queue, err = NewLevelQueue(handle, LevelQueueConfiguration{
|
||||||
|
DataDir: "level-queue-test-data",
|
||||||
|
BatchLength: 2,
|
||||||
|
}, &testData{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
go queue.Run(func(_ context.Context, shutdown func()) {
|
||||||
|
queueShutdown = shutdown
|
||||||
|
}, func(_ context.Context, terminate func()) {
|
||||||
|
queueTerminate = terminate
|
||||||
|
})
|
||||||
|
|
||||||
|
result3 := <-handleChan
|
||||||
|
assert.Equal(t, test1.TestString, result3.TestString)
|
||||||
|
assert.Equal(t, test1.TestInt, result3.TestInt)
|
||||||
|
|
||||||
|
result4 := <-handleChan
|
||||||
|
assert.Equal(t, test2.TestString, result4.TestString)
|
||||||
|
assert.Equal(t, test2.TestInt, result4.TestInt)
|
||||||
|
queueShutdown()
|
||||||
|
queueTerminate()
|
||||||
|
|
||||||
|
os.RemoveAll("level-queue-test-data")
|
||||||
|
}
|
190
modules/queue/queue_redis.go
Normal file
190
modules/queue/queue_redis.go
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RedisQueueType is the type for redis queue
|
||||||
|
const RedisQueueType Type = "redis"
|
||||||
|
|
||||||
|
type redisClient interface {
|
||||||
|
RPush(key string, args ...interface{}) *redis.IntCmd
|
||||||
|
LPop(key string) *redis.StringCmd
|
||||||
|
Ping() *redis.StatusCmd
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedisQueue redis queue
|
||||||
|
type RedisQueue struct {
|
||||||
|
client redisClient
|
||||||
|
queueName string
|
||||||
|
handle HandlerFunc
|
||||||
|
batchLength int
|
||||||
|
closed chan struct{}
|
||||||
|
exemplar interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedisQueueConfiguration is the configuration for the redis queue
|
||||||
|
type RedisQueueConfiguration struct {
|
||||||
|
Addresses string
|
||||||
|
Password string
|
||||||
|
DBIndex int
|
||||||
|
BatchLength int
|
||||||
|
QueueName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRedisQueue creates single redis or cluster redis queue
|
||||||
|
func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(RedisQueueConfiguration)
|
||||||
|
|
||||||
|
dbs := strings.Split(config.Addresses, ",")
|
||||||
|
var queue = RedisQueue{
|
||||||
|
queueName: config.QueueName,
|
||||||
|
handle: handle,
|
||||||
|
batchLength: config.BatchLength,
|
||||||
|
exemplar: exemplar,
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}
|
||||||
|
if len(dbs) == 0 {
|
||||||
|
return nil, errors.New("no redis host found")
|
||||||
|
} else if len(dbs) == 1 {
|
||||||
|
queue.client = redis.NewClient(&redis.Options{
|
||||||
|
Addr: strings.TrimSpace(dbs[0]), // use default Addr
|
||||||
|
Password: config.Password, // no password set
|
||||||
|
DB: config.DBIndex, // use default DB
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
queue.client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||||
|
Addrs: dbs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := queue.client.Ping().Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &queue, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the redis queue
|
||||||
|
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
atShutdown(context.Background(), r.Shutdown)
|
||||||
|
atTerminate(context.Background(), r.Terminate)
|
||||||
|
var i int
|
||||||
|
var datas = make([]Data, 0, r.batchLength)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-r.closed:
|
||||||
|
if len(datas) > 0 {
|
||||||
|
log.Trace("Handling: %d data, %v", len(datas), datas)
|
||||||
|
r.handle(datas...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
bs, err := r.client.LPop(r.queueName).Bytes()
|
||||||
|
if err != nil && err != redis.Nil {
|
||||||
|
log.Error("LPop failed: %v", err)
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
i++
|
||||||
|
if len(datas) > r.batchLength || (len(datas) > 0 && i > 3) {
|
||||||
|
log.Trace("Handling: %d data, %v", len(datas), datas)
|
||||||
|
r.handle(datas...)
|
||||||
|
datas = make([]Data, 0, r.batchLength)
|
||||||
|
i = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(bs) == 0 {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var data Data
|
||||||
|
if r.exemplar != nil {
|
||||||
|
t := reflect.TypeOf(r.exemplar)
|
||||||
|
n := reflect.New(t)
|
||||||
|
ne := n.Elem()
|
||||||
|
err = json.Unmarshal(bs, ne.Addr().Interface())
|
||||||
|
data = ne.Interface().(Data)
|
||||||
|
} else {
|
||||||
|
err = json.Unmarshal(bs, &data)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unmarshal: %v", err)
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace("RedisQueue: task found: %#v", data)
|
||||||
|
|
||||||
|
datas = append(datas, data)
|
||||||
|
select {
|
||||||
|
case <-r.closed:
|
||||||
|
if len(datas) > 0 {
|
||||||
|
log.Trace("Handling: %d data, %v", len(datas), datas)
|
||||||
|
r.handle(datas...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push implements Queue
|
||||||
|
func (r *RedisQueue) Push(data Data) error {
|
||||||
|
if r.exemplar != nil {
|
||||||
|
// Assert data is of same type as r.exemplar
|
||||||
|
value := reflect.ValueOf(data)
|
||||||
|
t := value.Type()
|
||||||
|
exemplarType := reflect.ValueOf(r.exemplar).Type()
|
||||||
|
if !t.AssignableTo(exemplarType) || data == nil {
|
||||||
|
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return r.client.RPush(r.queueName, bs).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown processing from this queue
|
||||||
|
func (r *RedisQueue) Shutdown() {
|
||||||
|
select {
|
||||||
|
case <-r.closed:
|
||||||
|
default:
|
||||||
|
close(r.closed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate this queue and close the queue
|
||||||
|
func (r *RedisQueue) Terminate() {
|
||||||
|
r.Shutdown()
|
||||||
|
if err := r.client.Close(); err != nil {
|
||||||
|
log.Error("Error whilst closing internal redis client: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[RedisQueueType] = NewRedisQueue
|
||||||
|
}
|
42
modules/queue/queue_test.go
Normal file
42
modules/queue/queue_test.go
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
import "github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
type testData struct {
|
||||||
|
TestString string
|
||||||
|
TestInt int
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToConfig(t *testing.T) {
|
||||||
|
cfg := testData{
|
||||||
|
TestString: "Config",
|
||||||
|
TestInt: 10,
|
||||||
|
}
|
||||||
|
exemplar := testData{}
|
||||||
|
|
||||||
|
cfg2I, err := toConfig(exemplar, cfg)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
cfg2, ok := (cfg2I).(testData)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.NotEqual(t, cfg2, exemplar)
|
||||||
|
assert.Equal(t, &cfg, &cfg2)
|
||||||
|
|
||||||
|
cfgString, err := json.Marshal(cfg)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
cfg3I, err := toConfig(exemplar, cfgString)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
cfg3, ok := (cfg3I).(testData)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, cfg.TestString, cfg3.TestString)
|
||||||
|
assert.Equal(t, cfg.TestInt, cfg3.TestInt)
|
||||||
|
assert.NotEqual(t, cfg3, exemplar)
|
||||||
|
}
|
183
modules/queue/queue_wrapped.go
Normal file
183
modules/queue/queue_wrapped.go
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WrappedQueueType is the type for a wrapped delayed starting queue
|
||||||
|
const WrappedQueueType Type = "wrapped"
|
||||||
|
|
||||||
|
// WrappedQueueConfiguration is the configuration for a WrappedQueue
|
||||||
|
type WrappedQueueConfiguration struct {
|
||||||
|
Underlying Type
|
||||||
|
Timeout time.Duration
|
||||||
|
MaxAttempts int
|
||||||
|
Config interface{}
|
||||||
|
QueueLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
type delayedStarter struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
internal Queue
|
||||||
|
underlying Type
|
||||||
|
cfg interface{}
|
||||||
|
timeout time.Duration
|
||||||
|
maxAttempts int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) {
|
||||||
|
var ctx context.Context
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
if q.timeout > 0 {
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), q.timeout)
|
||||||
|
} else {
|
||||||
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
defer cancel()
|
||||||
|
// Ensure we also stop at shutdown
|
||||||
|
atShutdown(ctx, func() {
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
i := 1
|
||||||
|
for q.internal == nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
q.lock.Unlock()
|
||||||
|
log.Fatal("Timedout creating queue %v with cfg %v ", q.underlying, q.cfg)
|
||||||
|
default:
|
||||||
|
queue, err := CreateQueue(q.underlying, handle, q.cfg, exemplar)
|
||||||
|
if err == nil {
|
||||||
|
q.internal = queue
|
||||||
|
q.lock.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err.Error() != "resource temporarily unavailable" {
|
||||||
|
log.Warn("[Attempt: %d] Failed to create queue: %v cfg: %v error: %v", i, q.underlying, q.cfg, err)
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if q.maxAttempts > 0 && i > q.maxAttempts {
|
||||||
|
q.lock.Unlock()
|
||||||
|
log.Fatal("Unable to create queue %v with cfg %v by max attempts: error: %v", q.underlying, q.cfg, err)
|
||||||
|
}
|
||||||
|
sleepTime := 100 * time.Millisecond
|
||||||
|
if q.timeout > 0 && q.maxAttempts > 0 {
|
||||||
|
sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts)
|
||||||
|
}
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WrappedQueue wraps a delayed starting queue
|
||||||
|
type WrappedQueue struct {
|
||||||
|
delayedStarter
|
||||||
|
handle HandlerFunc
|
||||||
|
exemplar interface{}
|
||||||
|
channel chan Data
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWrappedQueue will attempt to create a queue of the provided type,
|
||||||
|
// but if there is a problem creating this queue it will instead create
|
||||||
|
// a WrappedQueue with delayed the startup of the queue instead and a
|
||||||
|
// channel which will be redirected to the queue
|
||||||
|
func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||||
|
configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
config := configInterface.(WrappedQueueConfiguration)
|
||||||
|
|
||||||
|
queue, err := CreateQueue(config.Underlying, handle, config.Config, exemplar)
|
||||||
|
if err == nil {
|
||||||
|
// Just return the queue there is no need to wrap
|
||||||
|
return queue, nil
|
||||||
|
}
|
||||||
|
if IsErrInvalidConfiguration(err) {
|
||||||
|
// Retrying ain't gonna make this any better...
|
||||||
|
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &WrappedQueue{
|
||||||
|
handle: handle,
|
||||||
|
channel: make(chan Data, config.QueueLength),
|
||||||
|
exemplar: exemplar,
|
||||||
|
delayedStarter: delayedStarter{
|
||||||
|
cfg: config.Config,
|
||||||
|
underlying: config.Underlying,
|
||||||
|
timeout: config.Timeout,
|
||||||
|
maxAttempts: config.MaxAttempts,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push will push the data to the internal channel checking it against the exemplar
|
||||||
|
func (q *WrappedQueue) Push(data Data) error {
|
||||||
|
if q.exemplar != nil {
|
||||||
|
// Assert data is of same type as r.exemplar
|
||||||
|
value := reflect.ValueOf(data)
|
||||||
|
t := value.Type()
|
||||||
|
exemplarType := reflect.ValueOf(q.exemplar).Type()
|
||||||
|
if !t.AssignableTo(exemplarType) || data == nil {
|
||||||
|
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q.channel <- data
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts to run the queue and attempts to create the internal queue
|
||||||
|
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||||
|
q.lock.Lock()
|
||||||
|
if q.internal == nil {
|
||||||
|
q.setInternal(atShutdown, q.handle, q.exemplar)
|
||||||
|
go func() {
|
||||||
|
for data := range q.channel {
|
||||||
|
_ = q.internal.Push(data)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
q.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
q.internal.Run(atShutdown, atTerminate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown this queue and stop processing
|
||||||
|
func (q *WrappedQueue) Shutdown() {
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
if q.internal == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if shutdownable, ok := q.internal.(Shutdownable); ok {
|
||||||
|
shutdownable.Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate this queue and close the queue
|
||||||
|
func (q *WrappedQueue) Terminate() {
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
if q.internal == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if shutdownable, ok := q.internal.(Shutdownable); ok {
|
||||||
|
shutdownable.Terminate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
queuesMap[WrappedQueueType] = NewWrappedQueue
|
||||||
|
}
|
143
modules/setting/queue.go
Normal file
143
modules/setting/queue.go
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package setting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type queueSettings struct {
|
||||||
|
DataDir string
|
||||||
|
Length int
|
||||||
|
BatchLength int
|
||||||
|
ConnectionString string
|
||||||
|
Type string
|
||||||
|
Addresses string
|
||||||
|
Password string
|
||||||
|
DBIndex int
|
||||||
|
WrapIfNecessary bool
|
||||||
|
MaxAttempts int
|
||||||
|
Timeout time.Duration
|
||||||
|
Workers int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue settings
|
||||||
|
var Queue = queueSettings{}
|
||||||
|
|
||||||
|
// CreateQueue for name with provided handler and exemplar
|
||||||
|
func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) queue.Queue {
|
||||||
|
q := getQueueSettings(name)
|
||||||
|
opts := make(map[string]interface{})
|
||||||
|
opts["QueueLength"] = q.Length
|
||||||
|
opts["BatchLength"] = q.BatchLength
|
||||||
|
opts["DataDir"] = q.DataDir
|
||||||
|
opts["Addresses"] = q.Addresses
|
||||||
|
opts["Password"] = q.Password
|
||||||
|
opts["DBIndex"] = q.DBIndex
|
||||||
|
opts["QueueName"] = name
|
||||||
|
|
||||||
|
cfg, err := json.Marshal(opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
|
||||||
|
log.Error("Unable to create queue for %s", name, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
returnable, err := queue.CreateQueue(queue.Type(q.Type), handle, cfg, exemplar)
|
||||||
|
if q.WrapIfNecessary && err != nil {
|
||||||
|
log.Warn("Unable to create queue for %s: %v", name, err)
|
||||||
|
log.Warn("Attempting to create wrapped queue")
|
||||||
|
returnable, err = queue.CreateQueue(queue.WrappedQueueType, handle, queue.WrappedQueueConfiguration{
|
||||||
|
Underlying: queue.Type(q.Type),
|
||||||
|
Timeout: q.Timeout,
|
||||||
|
MaxAttempts: q.MaxAttempts,
|
||||||
|
Config: cfg,
|
||||||
|
QueueLength: q.Length,
|
||||||
|
}, exemplar)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to create queue for %s: %v", name, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return returnable
|
||||||
|
}
|
||||||
|
|
||||||
|
func getQueueSettings(name string) queueSettings {
|
||||||
|
q := queueSettings{}
|
||||||
|
sec := Cfg.Section("queue." + name)
|
||||||
|
// DataDir is not directly inheritable
|
||||||
|
q.DataDir = path.Join(Queue.DataDir, name)
|
||||||
|
for _, key := range sec.Keys() {
|
||||||
|
switch key.Name() {
|
||||||
|
case "DATADIR":
|
||||||
|
q.DataDir = key.MustString(q.DataDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !path.IsAbs(q.DataDir) {
|
||||||
|
q.DataDir = path.Join(AppDataPath, q.DataDir)
|
||||||
|
}
|
||||||
|
sec.Key("DATADIR").SetValue(q.DataDir)
|
||||||
|
// The rest are...
|
||||||
|
q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
|
||||||
|
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
||||||
|
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
||||||
|
validTypes := queue.RegisteredTypesAsString()
|
||||||
|
q.Type = sec.Key("TYPE").In(Queue.Type, validTypes)
|
||||||
|
q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
|
||||||
|
q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
|
||||||
|
q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
|
||||||
|
q.Workers = sec.Key("WORKER").MustInt(Queue.Workers)
|
||||||
|
|
||||||
|
q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
func newQueueService() {
|
||||||
|
sec := Cfg.Section("queue")
|
||||||
|
Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
|
||||||
|
if !path.IsAbs(Queue.DataDir) {
|
||||||
|
Queue.DataDir = path.Join(AppDataPath, Queue.DataDir)
|
||||||
|
}
|
||||||
|
Queue.Length = sec.Key("LENGTH").MustInt(20)
|
||||||
|
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
||||||
|
Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
|
||||||
|
validTypes := queue.RegisteredTypesAsString()
|
||||||
|
Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes)
|
||||||
|
Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
|
||||||
|
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
|
||||||
|
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
|
||||||
|
Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
|
||||||
|
Queue.Workers = sec.Key("WORKER").MustInt(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseQueueConnStr parses a queue connection string
|
||||||
|
func ParseQueueConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
|
||||||
|
fields := strings.Fields(connStr)
|
||||||
|
for _, f := range fields {
|
||||||
|
items := strings.SplitN(f, "=", 2)
|
||||||
|
if len(items) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch strings.ToLower(items[0]) {
|
||||||
|
case "addrs":
|
||||||
|
addrs = items[1]
|
||||||
|
case "password":
|
||||||
|
password = items[1]
|
||||||
|
case "db":
|
||||||
|
dbIdx, err = strconv.Atoi(items[1])
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
Loading…
Reference in a new issue