Move Queue creation behind new func that evaluates queue type (#4252)

This commit is contained in:
6543 2024-11-05 04:03:40 +01:00 committed by GitHub
parent 2fd0c38032
commit 3ab579c03f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 59 additions and 23 deletions

View file

@ -103,8 +103,11 @@ func checkSqliteFileExist(path string) error {
return err
}
func setupQueue(ctx context.Context, s store.Store) queue.Queue {
return queue.WithTaskStore(ctx, queue.New(ctx), s)
func setupQueue(ctx context.Context, s store.Store) (queue.Queue, error) {
return queue.New(ctx, queue.Config{
Backend: queue.TypeMemory,
Store: s,
})
}
func setupMembershipService(_ context.Context, _store store.Store) cache.MembershipService {
@ -143,18 +146,19 @@ func setupJWTSecret(_store store.Store) (string, error) {
return jwtSecret, nil
}
func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) error {
func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) {
// services
server.Config.Services.Queue = setupQueue(ctx, s)
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Membership = setupMembershipService(ctx, s)
serviceManager, err := services.NewManager(c, s, setup.Forge)
server.Config.Services.Queue, err = setupQueue(ctx, s)
if err != nil {
return fmt.Errorf("could not setup queue: %w", err)
}
server.Config.Services.Manager, err = services.NewManager(c, s, setup.Forge)
if err != nil {
return fmt.Errorf("could not setup service manager: %w", err)
}
server.Config.Services.Manager = serviceManager
server.Config.Services.LogStore, err = setupLogStore(c, s)
if err != nil {
return fmt.Errorf("could not setup log store: %w", err)

View file

@ -59,8 +59,8 @@ const processTimeInterval = 100 * time.Millisecond
var ErrWorkerKicked = fmt.Errorf("worker was kicked")
// New returns a new fifo queue.
func New(ctx context.Context) Queue {
// NewMemoryQueue returns a new fifo queue.
func NewMemoryQueue(ctx context.Context) Queue {
q := &fifo{
ctx: ctx,
workers: map[*worker]struct{}{},

View file

@ -32,7 +32,7 @@ func TestFifo(t *testing.T) {
want := &model.Task{ID: "1"}
ctx := context.Background()
q := New(ctx)
q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, want))
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
@ -55,7 +55,7 @@ func TestFifoExpire(t *testing.T) {
want := &model.Task{ID: "1"}
ctx, cancel := context.WithCancelCause(context.Background())
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
q.extension = 0
assert.NoError(t, q.Push(ctx, want))
info := q.Info(ctx)
@ -78,7 +78,7 @@ func TestFifoWait(t *testing.T) {
want := &model.Task{ID: "1"}
ctx := context.Background()
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.Push(ctx, want))
got, err := q.Poll(ctx, 1, filterFnTrue)
@ -101,7 +101,7 @@ func TestFifoEvict(t *testing.T) {
t1 := &model.Task{ID: "1"}
ctx := context.Background()
q := New(ctx)
q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, t1))
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
@ -125,7 +125,7 @@ func TestFifoDependencies(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1}))
got, err := q.Poll(ctx, 1, filterFnTrue)
@ -158,7 +158,7 @@ func TestFifoErrors(t *testing.T) {
RunOn: []string{"success", "failure"},
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, err := q.Poll(ctx, 1, filterFnTrue)
@ -194,7 +194,7 @@ func TestFifoErrors2(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
for i := 0; i < 2; i++ {
@ -234,7 +234,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
obtainedWorkCh := make(chan *model.Task)
@ -314,7 +314,7 @@ func TestFifoTransitiveErrors(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, err := q.Poll(ctx, 1, filterFnTrue)
@ -353,7 +353,7 @@ func TestFifoCancel(t *testing.T) {
RunOn: []string{"success", "failure"},
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
_, _ = q.Poll(ctx, 1, filterFnTrue)
@ -371,7 +371,7 @@ func TestFifoPause(t *testing.T) {
ID: "1",
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
var wg sync.WaitGroup
wg.Add(1)
go func() {
@ -402,7 +402,7 @@ func TestFifoPauseResume(t *testing.T) {
ID: "1",
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
q.Pause()
assert.NoError(t, q.Push(ctx, task1))
q.Resume()
@ -429,7 +429,7 @@ func TestWaitingVsPending(t *testing.T) {
RunOn: []string{"success", "failure"},
}
q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))
got, _ := q.Poll(ctx, 1, filterFnTrue)
@ -519,7 +519,7 @@ func TestShouldRun(t *testing.T) {
func TestFifoWithScoring(t *testing.T) {
ctx := context.Background()
q := New(ctx)
q := NewMemoryQueue(ctx)
// Create tasks with different labels
tasks := []*model.Task{

View file

@ -17,9 +17,11 @@ package queue
import (
"context"
"errors"
"fmt"
"strings"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
)
var (
@ -115,3 +117,33 @@ type Queue interface {
// KickAgentWorkers kicks all workers for a given agent.
KickAgentWorkers(agentID int64)
}
// Config holds the configuration for the queue.
type Config struct {
Backend Type
Store store.Store
}
// Queue type.
type Type string
const (
TypeMemory Type = "memory"
)
// New creates a new queue based on the provided configuration.
func New(ctx context.Context, config Config) (Queue, error) {
var q Queue
switch config.Backend {
case TypeMemory:
q = NewMemoryQueue(ctx)
if config.Store != nil {
q = WithTaskStore(ctx, q, config.Store)
}
default:
return nil, fmt.Errorf("unsupported queue backend: %s", config.Backend)
}
return q, nil
}