From 0e0d0188a0f3f95cc38c7999a3d63a61b591734b Mon Sep 17 00:00:00 2001 From: Anbraten <6918444+anbraten@users.noreply.github.com> Date: Fri, 16 Feb 2024 10:04:13 +0100 Subject: [PATCH] Fix agent polling (#3378) --- server/api/agent.go | 6 ++++++ server/grpc/rpc.go | 28 ++++++++++++++----------- server/queue/fifo.go | 43 +++++++++++++++++++++++++++----------- server/queue/persistent.go | 23 +++++++++++++++++++- server/queue/queue.go | 18 +++++++++------- 5 files changed, 85 insertions(+), 33 deletions(-) diff --git a/server/api/agent.go b/server/api/agent.go index 5946e93a9..7ce9a760d 100644 --- a/server/api/agent.go +++ b/server/api/agent.go @@ -137,6 +137,9 @@ func PatchAgent(c *gin.Context) { } agent.Name = in.Name agent.NoSchedule = in.NoSchedule + if agent.NoSchedule { + server.Config.Services.Queue.KickAgentWorkers(agent.ID) + } err = _store.AgentUpdate(agent) if err != nil { @@ -214,6 +217,9 @@ func DeleteAgent(c *gin.Context) { } } + // kick workers to remove the agent from the queue + server.Config.Services.Queue.KickAgentWorkers(agent.ID) + if err = _store.AgentDelete(agent); err != nil { c.String(http.StatusInternalServerError, "Error deleting user. %s", err) return diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index a37c72d5b..c4293c6ab 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -56,20 +56,23 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er log.Debug().Msgf("agent connected: %s: polling", hostname) } - fn := createFilterFunc(agentFilter) - for { - agent, err := s.getAgentFromContext(c) - if err != nil { - return nil, err - } else if agent.NoSchedule { - return nil, nil - } + filterFn := createFilterFunc(agentFilter) - task, err := s.queue.Poll(c, agent.ID, fn) - if err != nil { + agent, err := s.getAgentFromContext(c) + if err != nil { + return nil, err + } + + if agent.NoSchedule { + time.Sleep(1 * time.Second) + return nil, nil + } + + for { + // poll blocks until a task is available or the context is canceled / worker is kicked + task, err := s.queue.Poll(c, agent.ID, filterFn) + if err != nil || task == nil { return nil, err - } else if task == nil { - return nil, nil } if task.ShouldRun() { @@ -78,6 +81,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er return workflow, err } + // task should not run, so mark it as done if err := s.Done(c, task.ID, rpc.State{}); err != nil { log.Error().Err(err).Msgf("mark task '%s' done failed", task.ID) } diff --git a/server/queue/fifo.go b/server/queue/fifo.go index f66c69c6d..29e24d103 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -17,6 +17,7 @@ package queue import ( "container/list" "context" + "fmt" "sync" "time" @@ -36,6 +37,7 @@ type worker struct { agentID int64 filter FilterFn channel chan *model.Task + stop context.CancelCauseFunc } type fifo struct { @@ -61,7 +63,7 @@ func New(_ context.Context) Queue { } } -// Push pushes an item to the tail of this queue. +// Push pushes a task to the tail of this queue. func (q *fifo) Push(_ context.Context, task *model.Task) error { q.Lock() q.pending.PushBack(task) @@ -70,7 +72,7 @@ func (q *fifo) Push(_ context.Context, task *model.Task) error { return nil } -// Push pushes an item to the tail of this queue. +// PushAtOnce pushes multiple tasks to the tail of this queue. func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.Lock() for _, task := range tasks { @@ -81,13 +83,16 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { return nil } -// Poll retrieves and removes the head of this queue. +// Poll retrieves and removes a task head of this queue. func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { q.Lock() + ctx, stop := context.WithCancelCause(c) + w := &worker{ agentID: agentID, channel: make(chan *model.Task, 1), filter: f, + stop: stop, } q.workers[w] = struct{}{} q.Unlock() @@ -95,30 +100,30 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, for { select { - case <-c.Done(): + case <-ctx.Done(): q.Lock() delete(q.workers, w) q.Unlock() - return nil, nil + return nil, ctx.Err() case t := <-w.channel: return t, nil } } } -// Done signals that the item is done executing. +// Done signals the task is complete. func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error { return q.finished([]string{id}, exitStatus, nil) } -// Error signals that the item is done executing with error. +// Error signals the task is done with an error. func (q *fifo) Error(_ context.Context, id string, err error) error { return q.finished([]string{id}, model.StatusFailure, err) } -// ErrorAtOnce signals that the item is done executing with error. -func (q *fifo) ErrorAtOnce(_ context.Context, id []string, err error) error { - return q.finished(id, model.StatusFailure, err) +// ErrorAtOnce signals multiple done are complete with an error. +func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error { + return q.finished(ids, model.StatusFailure, err) } func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { @@ -145,7 +150,7 @@ func (q *fifo) Evict(c context.Context, id string) error { return q.EvictAtOnce(c, []string{id}) } -// Evict removes a pending task from the queue. +// EvictAtOnce removes multiple pending tasks from the queue. func (q *fifo) EvictAtOnce(_ context.Context, ids []string) error { q.Lock() defer q.Unlock() @@ -200,7 +205,6 @@ func (q *fifo) Info(_ context.Context) InfoT { stats.Stats.Pending = q.pending.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) - stats.Stats.Complete = 0 // TODO: implement this for e := q.pending.Front(); e != nil; e = e.Next() { task, _ := e.Value.(*model.Task) @@ -219,12 +223,14 @@ func (q *fifo) Info(_ context.Context) InfoT { return stats } +// Pause stops the queue from handing out new work items in Poll func (q *fifo) Pause() { q.Lock() q.paused = true q.Unlock() } +// Resume starts the queue again. func (q *fifo) Resume() { q.Lock() q.paused = false @@ -232,6 +238,19 @@ func (q *fifo) Resume() { go q.process() } +// KickAgentWorkers kicks all workers for a given agent. +func (q *fifo) KickAgentWorkers(agentID int64) { + q.Lock() + defer q.Unlock() + + for w := range q.workers { + if w.agentID == agentID { + w.stop(fmt.Errorf("worker was kicked")) + delete(q.workers, w) + } + } +} + // helper function that loops through the queue and attempts to // match the item to a single subscriber. func (q *fifo) process() { diff --git a/server/queue/persistent.go b/server/queue/persistent.go index df8c2cea6..354b309cb 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -95,7 +95,7 @@ func (q *persistentQueue) Evict(c context.Context, id string) error { return err } -// EvictAtOnce removes a pending task from the queue. +// EvictAtOnce removes multiple pending tasks from the queue. func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { if err := q.Queue.EvictAtOnce(c, ids); err != nil { return err @@ -107,3 +107,24 @@ func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { } return nil } + +// Error signals the task is done with an error. +func (q *persistentQueue) Error(c context.Context, id string, err error) error { + if err := q.Queue.Error(c, id, err); err != nil { + return err + } + return q.store.TaskDelete(id) +} + +// ErrorAtOnce signals multiple tasks are done with an error. +func (q *persistentQueue) ErrorAtOnce(c context.Context, ids []string, err error) error { + if err := q.Queue.ErrorAtOnce(c, ids, err); err != nil { + return err + } + for _, id := range ids { + if err := q.store.TaskDelete(id); err != nil { + return err + } + } + return nil +} diff --git a/server/queue/queue.go b/server/queue/queue.go index 0ecd81181..9cb695c24 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -40,7 +40,6 @@ type InfoT struct { Pending int `json:"pending_count"` WaitingOnDeps int `json:"waiting_on_deps_count"` Running int `json:"running_count"` - Complete int `json:"completed_count"` } `json:"stats"` Paused bool `json:"paused"` } // @name InfoT @@ -73,7 +72,7 @@ type Queue interface { // Push pushes a task to the tail of this queue. Push(c context.Context, task *model.Task) error - // PushAtOnce pushes a task to the tail of this queue. + // PushAtOnce pushes multiple tasks to the tail of this queue. PushAtOnce(c context.Context, tasks []*model.Task) error // Poll retrieves and removes a task head of this queue. @@ -85,17 +84,17 @@ type Queue interface { // Done signals the task is complete. Done(c context.Context, id string, exitStatus model.StatusValue) error - // Error signals the task is complete with errors. + // Error signals the task is done with an error. Error(c context.Context, id string, err error) error - // ErrorAtOnce signals the task is complete with errors. - ErrorAtOnce(c context.Context, id []string, err error) error + // ErrorAtOnce signals multiple done are complete with an error. + ErrorAtOnce(c context.Context, ids []string, err error) error // Evict removes a pending task from the queue. Evict(c context.Context, id string) error - // EvictAtOnce removes a pending task from the queue. - EvictAtOnce(c context.Context, id []string) error + // EvictAtOnce removes multiple pending tasks from the queue. + EvictAtOnce(c context.Context, ids []string) error // Wait waits until the task is complete. Wait(c context.Context, id string) error @@ -106,6 +105,9 @@ type Queue interface { // Pause stops the queue from handing out new work items in Poll Pause() - // Resume starts the queue again, Poll returns new items + // Resume starts the queue again. Resume() + + // KickAgentWorkers kicks all workers for a given agent. + KickAgentWorkers(agentID int64) }