Fix agent polling (#3378)

This commit is contained in:
Anbraten 2024-02-16 10:04:13 +01:00 committed by GitHub
parent bfff8dbc6f
commit 0e0d0188a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 85 additions and 33 deletions

View file

@ -137,6 +137,9 @@ func PatchAgent(c *gin.Context) {
} }
agent.Name = in.Name agent.Name = in.Name
agent.NoSchedule = in.NoSchedule agent.NoSchedule = in.NoSchedule
if agent.NoSchedule {
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
}
err = _store.AgentUpdate(agent) err = _store.AgentUpdate(agent)
if err != nil { if err != nil {
@ -214,6 +217,9 @@ func DeleteAgent(c *gin.Context) {
} }
} }
// kick workers to remove the agent from the queue
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
if err = _store.AgentDelete(agent); err != nil { if err = _store.AgentDelete(agent); err != nil {
c.String(http.StatusInternalServerError, "Error deleting user. %s", err) c.String(http.StatusInternalServerError, "Error deleting user. %s", err)
return return

View file

@ -56,20 +56,23 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
log.Debug().Msgf("agent connected: %s: polling", hostname) log.Debug().Msgf("agent connected: %s: polling", hostname)
} }
fn := createFilterFunc(agentFilter) filterFn := createFilterFunc(agentFilter)
for {
agent, err := s.getAgentFromContext(c) agent, err := s.getAgentFromContext(c)
if err != nil { if err != nil {
return nil, err return nil, err
} else if agent.NoSchedule { }
if agent.NoSchedule {
time.Sleep(1 * time.Second)
return nil, nil return nil, nil
} }
task, err := s.queue.Poll(c, agent.ID, fn) for {
if err != nil { // poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
if err != nil || task == nil {
return nil, err return nil, err
} else if task == nil {
return nil, nil
} }
if task.ShouldRun() { if task.ShouldRun() {
@ -78,6 +81,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
return workflow, err return workflow, err
} }
// task should not run, so mark it as done
if err := s.Done(c, task.ID, rpc.State{}); err != nil { if err := s.Done(c, task.ID, rpc.State{}); err != nil {
log.Error().Err(err).Msgf("mark task '%s' done failed", task.ID) log.Error().Err(err).Msgf("mark task '%s' done failed", task.ID)
} }

View file

@ -17,6 +17,7 @@ package queue
import ( import (
"container/list" "container/list"
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
@ -36,6 +37,7 @@ type worker struct {
agentID int64 agentID int64
filter FilterFn filter FilterFn
channel chan *model.Task channel chan *model.Task
stop context.CancelCauseFunc
} }
type fifo struct { type fifo struct {
@ -61,7 +63,7 @@ func New(_ context.Context) Queue {
} }
} }
// Push pushes an item to the tail of this queue. // Push pushes a task to the tail of this queue.
func (q *fifo) Push(_ context.Context, task *model.Task) error { func (q *fifo) Push(_ context.Context, task *model.Task) error {
q.Lock() q.Lock()
q.pending.PushBack(task) q.pending.PushBack(task)
@ -70,7 +72,7 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error {
return nil return nil
} }
// Push pushes an item to the tail of this queue. // PushAtOnce pushes multiple tasks to the tail of this queue.
func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
q.Lock() q.Lock()
for _, task := range tasks { for _, task := range tasks {
@ -81,13 +83,16 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
return nil return nil
} }
// Poll retrieves and removes the head of this queue. // Poll retrieves and removes a task head of this queue.
func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
q.Lock() q.Lock()
ctx, stop := context.WithCancelCause(c)
w := &worker{ w := &worker{
agentID: agentID, agentID: agentID,
channel: make(chan *model.Task, 1), channel: make(chan *model.Task, 1),
filter: f, filter: f,
stop: stop,
} }
q.workers[w] = struct{}{} q.workers[w] = struct{}{}
q.Unlock() q.Unlock()
@ -95,30 +100,30 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task,
for { for {
select { select {
case <-c.Done(): case <-ctx.Done():
q.Lock() q.Lock()
delete(q.workers, w) delete(q.workers, w)
q.Unlock() q.Unlock()
return nil, nil return nil, ctx.Err()
case t := <-w.channel: case t := <-w.channel:
return t, nil return t, nil
} }
} }
} }
// Done signals that the item is done executing. // Done signals the task is complete.
func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error { func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error {
return q.finished([]string{id}, exitStatus, nil) return q.finished([]string{id}, exitStatus, nil)
} }
// Error signals that the item is done executing with error. // Error signals the task is done with an error.
func (q *fifo) Error(_ context.Context, id string, err error) error { func (q *fifo) Error(_ context.Context, id string, err error) error {
return q.finished([]string{id}, model.StatusFailure, err) return q.finished([]string{id}, model.StatusFailure, err)
} }
// ErrorAtOnce signals that the item is done executing with error. // ErrorAtOnce signals multiple done are complete with an error.
func (q *fifo) ErrorAtOnce(_ context.Context, id []string, err error) error { func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error {
return q.finished(id, model.StatusFailure, err) return q.finished(ids, model.StatusFailure, err)
} }
func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error {
@ -145,7 +150,7 @@ func (q *fifo) Evict(c context.Context, id string) error {
return q.EvictAtOnce(c, []string{id}) return q.EvictAtOnce(c, []string{id})
} }
// Evict removes a pending task from the queue. // EvictAtOnce removes multiple pending tasks from the queue.
func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@ -200,7 +205,6 @@ func (q *fifo) Info(_ context.Context) InfoT {
stats.Stats.Pending = q.pending.Len() stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running) stats.Stats.Running = len(q.running)
stats.Stats.Complete = 0 // TODO: implement this
for e := q.pending.Front(); e != nil; e = e.Next() { for e := q.pending.Front(); e != nil; e = e.Next() {
task, _ := e.Value.(*model.Task) task, _ := e.Value.(*model.Task)
@ -219,12 +223,14 @@ func (q *fifo) Info(_ context.Context) InfoT {
return stats return stats
} }
// Pause stops the queue from handing out new work items in Poll
func (q *fifo) Pause() { func (q *fifo) Pause() {
q.Lock() q.Lock()
q.paused = true q.paused = true
q.Unlock() q.Unlock()
} }
// Resume starts the queue again.
func (q *fifo) Resume() { func (q *fifo) Resume() {
q.Lock() q.Lock()
q.paused = false q.paused = false
@ -232,6 +238,19 @@ func (q *fifo) Resume() {
go q.process() go q.process()
} }
// KickAgentWorkers kicks all workers for a given agent.
func (q *fifo) KickAgentWorkers(agentID int64) {
q.Lock()
defer q.Unlock()
for w := range q.workers {
if w.agentID == agentID {
w.stop(fmt.Errorf("worker was kicked"))
delete(q.workers, w)
}
}
}
// helper function that loops through the queue and attempts to // helper function that loops through the queue and attempts to
// match the item to a single subscriber. // match the item to a single subscriber.
func (q *fifo) process() { func (q *fifo) process() {

View file

@ -95,7 +95,7 @@ func (q *persistentQueue) Evict(c context.Context, id string) error {
return err return err
} }
// EvictAtOnce removes a pending task from the queue. // EvictAtOnce removes multiple pending tasks from the queue.
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
if err := q.Queue.EvictAtOnce(c, ids); err != nil { if err := q.Queue.EvictAtOnce(c, ids); err != nil {
return err return err
@ -107,3 +107,24 @@ func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
} }
return nil return nil
} }
// Error signals the task is done with an error.
func (q *persistentQueue) Error(c context.Context, id string, err error) error {
if err := q.Queue.Error(c, id, err); err != nil {
return err
}
return q.store.TaskDelete(id)
}
// ErrorAtOnce signals multiple tasks are done with an error.
func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error) error {
if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil {
return err
}
for _, id := range ids {
if err := q.store.TaskDelete(id); err != nil {
return err
}
}
return nil
}

View file

@ -40,7 +40,6 @@ type InfoT struct {
Pending int `json:"pending_count"` Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"` WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"` Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"` } `json:"stats"`
Paused bool `json:"paused"` Paused bool `json:"paused"`
} // @name InfoT } // @name InfoT
@ -73,7 +72,7 @@ type Queue interface {
// Push pushes a task to the tail of this queue. // Push pushes a task to the tail of this queue.
Push(c context.Context, task *model.Task) error Push(c context.Context, task *model.Task) error
// PushAtOnce pushes a task to the tail of this queue. // PushAtOnce pushes multiple tasks to the tail of this queue.
PushAtOnce(c context.Context, tasks []*model.Task) error PushAtOnce(c context.Context, tasks []*model.Task) error
// Poll retrieves and removes a task head of this queue. // Poll retrieves and removes a task head of this queue.
@ -85,17 +84,17 @@ type Queue interface {
// Done signals the task is complete. // Done signals the task is complete.
Done(c context.Context, id string, exitStatus model.StatusValue) error Done(c context.Context, id string, exitStatus model.StatusValue) error
// Error signals the task is complete with errors. // Error signals the task is done with an error.
Error(c context.Context, id string, err error) error Error(c context.Context, id string, err error) error
// ErrorAtOnce signals the task is complete with errors. // ErrorAtOnce signals multiple done are complete with an error.
ErrorAtOnce(c context.Context, id []string, err error) error ErrorAtOnce(c context.Context, ids []string, err error) error
// Evict removes a pending task from the queue. // Evict removes a pending task from the queue.
Evict(c context.Context, id string) error Evict(c context.Context, id string) error
// EvictAtOnce removes a pending task from the queue. // EvictAtOnce removes multiple pending tasks from the queue.
EvictAtOnce(c context.Context, id []string) error EvictAtOnce(c context.Context, ids []string) error
// Wait waits until the task is complete. // Wait waits until the task is complete.
Wait(c context.Context, id string) error Wait(c context.Context, id string) error
@ -106,6 +105,9 @@ type Queue interface {
// Pause stops the queue from handing out new work items in Poll // Pause stops the queue from handing out new work items in Poll
Pause() Pause()
// Resume starts the queue again, Poll returns new items // Resume starts the queue again.
Resume() Resume()
// KickAgentWorkers kicks all workers for a given agent.
KickAgentWorkers(agentID int64)
} }