mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-01-01 21:28:44 +00:00
Save agent-id for tasks and add endpoint to get agent tasks (#1631)
Save which agent is running a task. This is now visible in the admin UI in the queue and in the agent details screen. # changes - [x] save id of agent executing a task - [x] add endpoint to get tasks of an agent for #999 - [x] show assigned agent-id in queue - [x] (offtopic) use same colors for queue stats and icons (similar to the ones used by pipelines) - [x] (offtopic) use badges for queue labels & dependencies ![image](https://user-images.githubusercontent.com/6918444/226541271-23f3b7b2-7a08-45c2-a2e6-1c7fc31b6f1d.png)
This commit is contained in:
parent
46452fbd84
commit
f13ffc2c8f
17 changed files with 124 additions and 78 deletions
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/gorilla/securecookie"
|
"github.com/gorilla/securecookie"
|
||||||
|
|
||||||
|
"github.com/woodpecker-ci/woodpecker/server"
|
||||||
"github.com/woodpecker-ci/woodpecker/server/model"
|
"github.com/woodpecker-ci/woodpecker/server/model"
|
||||||
"github.com/woodpecker-ci/woodpecker/server/router/middleware/session"
|
"github.com/woodpecker-ci/woodpecker/server/router/middleware/session"
|
||||||
"github.com/woodpecker-ci/woodpecker/server/store"
|
"github.com/woodpecker-ci/woodpecker/server/store"
|
||||||
|
@ -51,6 +52,30 @@ func GetAgent(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, agent)
|
c.JSON(http.StatusOK, agent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetAgentTasks(c *gin.Context) {
|
||||||
|
agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
agent, err := store.FromContext(c).AgentFind(agentID)
|
||||||
|
if err != nil {
|
||||||
|
c.String(http.StatusNotFound, "Cannot find agent. %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks := []*model.Task{}
|
||||||
|
info := server.Config.Services.Queue.Info(c)
|
||||||
|
for _, task := range info.Running {
|
||||||
|
if task.AgentID == agent.ID {
|
||||||
|
tasks = append(tasks, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, tasks)
|
||||||
|
}
|
||||||
|
|
||||||
func PatchAgent(c *gin.Context) {
|
func PatchAgent(c *gin.Context) {
|
||||||
_store := store.FromContext(c)
|
_store := store.FromContext(c)
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
task, err := s.queue.Poll(c, fn)
|
task, err := s.queue.Poll(c, agent.ID, fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if task == nil {
|
} else if task == nil {
|
||||||
|
@ -131,14 +131,6 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata, ok := grpcMetadata.FromIncomingContext(c)
|
|
||||||
if ok {
|
|
||||||
hostname, ok := metadata["hostname"]
|
|
||||||
if ok && len(hostname) != 0 {
|
|
||||||
step.Machine = hostname[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Msgf("error: cannot find repo with id %d: %s", currentPipeline.RepoID, err)
|
log.Error().Msgf("error: cannot find repo with id %d: %s", currentPipeline.RepoID, err)
|
||||||
|
@ -258,13 +250,12 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
|
||||||
log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err)
|
log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
metadata, ok := grpcMetadata.FromIncomingContext(c)
|
|
||||||
if ok {
|
agent, err := s.getAgentFromContext(c)
|
||||||
hostname, ok := metadata["hostname"]
|
if err != nil {
|
||||||
if ok && len(hostname) != 0 {
|
return err
|
||||||
step.Machine = hostname[0]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
step.AgentID = agent.ID
|
||||||
|
|
||||||
currentPipeline, err := s.store.GetPipeline(step.PipelineID)
|
currentPipeline, err := s.store.GetPipeline(step.PipelineID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -42,7 +42,7 @@ type Step struct {
|
||||||
ExitCode int `json:"exit_code" xorm:"step_exit_code"`
|
ExitCode int `json:"exit_code" xorm:"step_exit_code"`
|
||||||
Started int64 `json:"start_time,omitempty" xorm:"step_started"`
|
Started int64 `json:"start_time,omitempty" xorm:"step_started"`
|
||||||
Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"`
|
Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"`
|
||||||
Machine string `json:"machine,omitempty" xorm:"step_machine"`
|
AgentID int64 `json:"agent_id,omitempty" xorm:"step_agent_id"`
|
||||||
Platform string `json:"platform,omitempty" xorm:"step_platform"`
|
Platform string `json:"platform,omitempty" xorm:"step_platform"`
|
||||||
Environ map[string]string `json:"environ,omitempty" xorm:"json 'step_environ'"`
|
Environ map[string]string `json:"environ,omitempty" xorm:"json 'step_environ'"`
|
||||||
Children []*Step `json:"children,omitempty" xorm:"-"`
|
Children []*Step `json:"children,omitempty" xorm:"-"`
|
||||||
|
|
|
@ -26,14 +26,6 @@ type TaskStore interface {
|
||||||
TaskDelete(string) error
|
TaskDelete(string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskStatusValue string
|
|
||||||
|
|
||||||
const (
|
|
||||||
TaskStatusSkipped TaskStatusValue = "skipped"
|
|
||||||
TaskStatusSuccess TaskStatusValue = "success"
|
|
||||||
TaskStatusFailure TaskStatusValue = "failure"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Task defines scheduled pipeline Task.
|
// Task defines scheduled pipeline Task.
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID string `json:"id" xorm:"PK UNIQUE 'task_id'"`
|
ID string `json:"id" xorm:"PK UNIQUE 'task_id'"`
|
||||||
|
@ -42,6 +34,7 @@ type Task struct {
|
||||||
Dependencies []string `json:"dependencies" xorm:"json 'task_dependencies'"`
|
Dependencies []string `json:"dependencies" xorm:"json 'task_dependencies'"`
|
||||||
RunOn []string `json:"run_on" xorm:"json 'task_run_on'"`
|
RunOn []string `json:"run_on" xorm:"json 'task_run_on'"`
|
||||||
DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'task_dep_status'"`
|
DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'task_dep_status'"`
|
||||||
|
AgentID int64 `json:"agent_id" xorm:"'agent_id'"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TableName return database table name for xorm
|
// TableName return database table name for xorm
|
||||||
|
|
|
@ -34,6 +34,7 @@ type entry struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type worker struct {
|
type worker struct {
|
||||||
|
agentID int64
|
||||||
filter FilterFn
|
filter FilterFn
|
||||||
channel chan *model.Task
|
channel chan *model.Task
|
||||||
}
|
}
|
||||||
|
@ -82,9 +83,10 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll retrieves and removes the head of this queue.
|
// Poll retrieves and removes the head of this queue.
|
||||||
func (q *fifo) Poll(c context.Context, f FilterFn) (*model.Task, error) {
|
func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
|
||||||
q.Lock()
|
q.Lock()
|
||||||
w := &worker{
|
w := &worker{
|
||||||
|
agentID: agentID,
|
||||||
channel: make(chan *model.Task, 1),
|
channel: make(chan *model.Task, 1),
|
||||||
filter: f,
|
filter: f,
|
||||||
}
|
}
|
||||||
|
@ -254,6 +256,7 @@ func (q *fifo) process() {
|
||||||
q.filterWaiting()
|
q.filterWaiting()
|
||||||
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
|
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
|
||||||
task := pending.Value.(*model.Task)
|
task := pending.Value.(*model.Task)
|
||||||
|
task.AgentID = worker.agentID
|
||||||
delete(q.workers, worker)
|
delete(q.workers, worker)
|
||||||
q.pending.Remove(pending)
|
q.pending.Remove(pending)
|
||||||
q.running[task.ID] = &entry{
|
q.running[task.ID] = &entry{
|
||||||
|
|
|
@ -25,7 +25,7 @@ func TestFifo(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != want {
|
if got != want {
|
||||||
t.Errorf("expect task returned form queue")
|
t.Errorf("expect task returned form queue")
|
||||||
return
|
return
|
||||||
|
@ -65,7 +65,7 @@ func TestFifoExpire(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != want {
|
if got != want {
|
||||||
t.Errorf("expect task returned form queue")
|
t.Errorf("expect task returned form queue")
|
||||||
return
|
return
|
||||||
|
@ -84,7 +84,7 @@ func TestFifoWait(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.Push(noContext, want))
|
assert.NoError(t, q.Push(noContext, want))
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != want {
|
if got != want {
|
||||||
t.Errorf("expect task returned form queue")
|
t.Errorf("expect task returned form queue")
|
||||||
return
|
return
|
||||||
|
@ -137,7 +137,7 @@ func TestFifoDependencies(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1}))
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task1 {
|
if got != task1 {
|
||||||
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
||||||
return
|
return
|
||||||
|
@ -145,7 +145,7 @@ func TestFifoDependencies(t *testing.T) {
|
||||||
|
|
||||||
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
|
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
|
||||||
|
|
||||||
got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task2 {
|
if got != task2 {
|
||||||
t.Errorf("expect task2 returned from queue")
|
t.Errorf("expect task2 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -173,7 +173,7 @@ func TestFifoErrors(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task1 {
|
if got != task1 {
|
||||||
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
||||||
return
|
return
|
||||||
|
@ -181,7 +181,7 @@ func TestFifoErrors(t *testing.T) {
|
||||||
|
|
||||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||||
|
|
||||||
got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task2 {
|
if got != task2 {
|
||||||
t.Errorf("expect task2 returned from queue")
|
t.Errorf("expect task2 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -192,7 +192,7 @@ func TestFifoErrors(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task3 {
|
if got != task3 {
|
||||||
t.Errorf("expect task3 returned from queue")
|
t.Errorf("expect task3 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -223,7 +223,7 @@ func TestFifoErrors2(t *testing.T) {
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
||||||
|
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task1 && got != task2 {
|
if got != task1 && got != task2 {
|
||||||
t.Errorf("expect task1 or task2 returned from queue as task3 depends on them")
|
t.Errorf("expect task1 or task2 returned from queue as task3 depends on them")
|
||||||
return
|
return
|
||||||
|
@ -237,7 +237,7 @@ func TestFifoErrors2(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task3 {
|
if got != task3 {
|
||||||
t.Errorf("expect task3 returned from queue")
|
t.Errorf("expect task3 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -275,7 +275,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
for {
|
for {
|
||||||
fmt.Printf("Worker %d started\n", i)
|
fmt.Printf("Worker %d started\n", i)
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
obtainedWorkCh <- got
|
obtainedWorkCh <- got
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
@ -299,7 +299,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
fmt.Printf("Worker spawned\n")
|
fmt.Printf("Worker spawned\n")
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
obtainedWorkCh <- got
|
obtainedWorkCh <- got
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -313,7 +313,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
fmt.Printf("Worker spawned\n")
|
fmt.Printf("Worker spawned\n")
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
obtainedWorkCh <- got
|
obtainedWorkCh <- got
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -359,14 +359,14 @@ func TestFifoTransitiveErrors(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task1 {
|
if got != task1 {
|
||||||
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
t.Errorf("expect task1 returned from queue as task2 depends on it")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||||
|
|
||||||
got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task2 {
|
if got != task2 {
|
||||||
t.Errorf("expect task2 returned from queue")
|
t.Errorf("expect task2 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -377,7 +377,7 @@ func TestFifoTransitiveErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped))
|
assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped))
|
||||||
|
|
||||||
got, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
if got != task3 {
|
if got != task3 {
|
||||||
t.Errorf("expect task3 returned from queue")
|
t.Errorf("expect task3 returned from queue")
|
||||||
return
|
return
|
||||||
|
@ -409,7 +409,7 @@ func TestFifoCancel(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
||||||
|
|
||||||
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled")))
|
assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled")))
|
||||||
assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled")))
|
assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled")))
|
||||||
assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled")))
|
assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled")))
|
||||||
|
@ -430,7 +430,7 @@ func TestFifoPause(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -450,7 +450,7 @@ func TestFifoPause(t *testing.T) {
|
||||||
q.Pause()
|
q.Pause()
|
||||||
assert.NoError(t, q.Push(noContext, task1))
|
assert.NoError(t, q.Push(noContext, task1))
|
||||||
q.Resume()
|
q.Resume()
|
||||||
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFifoPauseResume(t *testing.T) {
|
func TestFifoPauseResume(t *testing.T) {
|
||||||
|
@ -463,7 +463,7 @@ func TestFifoPauseResume(t *testing.T) {
|
||||||
assert.NoError(t, q.Push(noContext, task1))
|
assert.NoError(t, q.Push(noContext, task1))
|
||||||
q.Resume()
|
q.Resume()
|
||||||
|
|
||||||
_, _ = q.Poll(noContext, func(*model.Task) bool { return true })
|
_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWaitingVsPending(t *testing.T) {
|
func TestWaitingVsPending(t *testing.T) {
|
||||||
|
@ -487,7 +487,7 @@ func TestWaitingVsPending(t *testing.T) {
|
||||||
q := New(context.Background()).(*fifo)
|
q := New(context.Background()).(*fifo)
|
||||||
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))
|
||||||
|
|
||||||
got, _ := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
|
|
||||||
info := q.Info(noContext)
|
info := q.Info(noContext)
|
||||||
if info.Stats.WaitingOnDeps != 2 {
|
if info.Stats.WaitingOnDeps != 2 {
|
||||||
|
@ -495,7 +495,7 @@ func TestWaitingVsPending(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")))
|
||||||
got, err := q.Poll(noContext, func(*model.Task) bool { return true })
|
got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.EqualValues(t, task2, got)
|
assert.EqualValues(t, task2, got)
|
||||||
|
|
||||||
|
|
|
@ -73,8 +73,8 @@ func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll retrieves and removes a task head of this queue.
|
// Poll retrieves and removes a task head of this queue.
|
||||||
func (q *persistentQueue) Poll(c context.Context, f FilterFn) (*model.Task, error) {
|
func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
|
||||||
task, err := q.Queue.Poll(c, f)
|
task, err := q.Queue.Poll(c, agentID, f)
|
||||||
if task != nil {
|
if task != nil {
|
||||||
log.Debug().Msgf("pull queue item: %s: remove from backup", task.ID)
|
log.Debug().Msgf("pull queue item: %s: remove from backup", task.ID)
|
||||||
if derr := q.store.TaskDelete(task.ID); derr != nil {
|
if derr := q.store.TaskDelete(task.ID); derr != nil {
|
||||||
|
|
|
@ -63,7 +63,7 @@ type Queue interface {
|
||||||
PushAtOnce(c context.Context, tasks []*model.Task) error
|
PushAtOnce(c context.Context, tasks []*model.Task) error
|
||||||
|
|
||||||
// Poll retrieves and removes a task head of this queue.
|
// Poll retrieves and removes a task head of this queue.
|
||||||
Poll(c context.Context, f FilterFn) (*model.Task, error)
|
Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)
|
||||||
|
|
||||||
// Extend extends the deadline for a task.
|
// Extend extends the deadline for a task.
|
||||||
Extend(c context.Context, id string) error
|
Extend(c context.Context, id string) error
|
||||||
|
|
|
@ -173,6 +173,7 @@ func apiRoutes(e *gin.Engine) {
|
||||||
agentBase.GET("", api.GetAgents)
|
agentBase.GET("", api.GetAgents)
|
||||||
agentBase.POST("", api.PostAgent)
|
agentBase.POST("", api.PostAgent)
|
||||||
agentBase.GET("/:agent", api.GetAgent)
|
agentBase.GET("/:agent", api.GetAgent)
|
||||||
|
agentBase.GET("/:agent/tasks", api.GetAgentTasks)
|
||||||
agentBase.PATCH("/:agent", api.PatchAgent)
|
agentBase.PATCH("/:agent", api.PatchAgent)
|
||||||
agentBase.DELETE("/:agent", api.DeleteAgent)
|
agentBase.DELETE("/:agent", api.DeleteAgent)
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestStepFind(t *testing.T) {
|
||||||
State: model.StatusSuccess,
|
State: model.StatusSuccess,
|
||||||
Error: "pc load letter",
|
Error: "pc load letter",
|
||||||
ExitCode: 255,
|
ExitCode: 255,
|
||||||
Machine: "localhost",
|
AgentID: 1,
|
||||||
Platform: "linux/amd64",
|
Platform: "linux/amd64",
|
||||||
Environ: map[string]string{"GOLANG": "tip"},
|
Environ: map[string]string{"GOLANG": "tip"},
|
||||||
},
|
},
|
||||||
|
@ -147,7 +147,7 @@ func TestStepUpdate(t *testing.T) {
|
||||||
State: "pending",
|
State: "pending",
|
||||||
Error: "pc load letter",
|
Error: "pc load letter",
|
||||||
ExitCode: 255,
|
ExitCode: 255,
|
||||||
Machine: "localhost",
|
AgentID: 1,
|
||||||
Platform: "linux/amd64",
|
Platform: "linux/amd64",
|
||||||
Environ: map[string]string{"GOLANG": "tip"},
|
Environ: map[string]string{"GOLANG": "tip"},
|
||||||
}
|
}
|
||||||
|
|
2
web/components.d.ts
vendored
2
web/components.d.ts
vendored
|
@ -63,10 +63,10 @@ declare module '@vue/runtime-core' {
|
||||||
IMdiGestureTap: typeof import('~icons/mdi/gesture-tap')['default']
|
IMdiGestureTap: typeof import('~icons/mdi/gesture-tap')['default']
|
||||||
IMdiGithub: typeof import('~icons/mdi/github')['default']
|
IMdiGithub: typeof import('~icons/mdi/github')['default']
|
||||||
IMdiLoading: typeof import('~icons/mdi/loading')['default']
|
IMdiLoading: typeof import('~icons/mdi/loading')['default']
|
||||||
IMdiSync: typeof import('~icons/mdi/sync')['default']
|
|
||||||
IMdiSourceBranch: typeof import('~icons/mdi/source-branch')['default']
|
IMdiSourceBranch: typeof import('~icons/mdi/source-branch')['default']
|
||||||
IMdisourceCommit: typeof import('~icons/mdi/source-commit')['default']
|
IMdisourceCommit: typeof import('~icons/mdi/source-commit')['default']
|
||||||
IMdiSourcePull: typeof import('~icons/mdi/source-pull')['default']
|
IMdiSourcePull: typeof import('~icons/mdi/source-pull')['default']
|
||||||
|
IMdiSync: typeof import('~icons/mdi/sync')['default']
|
||||||
IMdiTagOutline: typeof import('~icons/mdi/tag-outline')['default']
|
IMdiTagOutline: typeof import('~icons/mdi/tag-outline')['default']
|
||||||
InputField: typeof import('./src/components/form/InputField.vue')['default']
|
InputField: typeof import('./src/components/form/InputField.vue')['default']
|
||||||
IPhGitlabLogoSimpleFill: typeof import('~icons/ph/gitlab-logo-simple-fill')['default']
|
IPhGitlabLogoSimpleFill: typeof import('~icons/ph/gitlab-logo-simple-fill')['default']
|
||||||
|
|
|
@ -368,7 +368,9 @@
|
||||||
"version": "Version",
|
"version": "Version",
|
||||||
"last_contact": "Last contact",
|
"last_contact": "Last contact",
|
||||||
"never": "Never",
|
"never": "Never",
|
||||||
"delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore."
|
"delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore.",
|
||||||
|
"edit_agent": "Edit agent",
|
||||||
|
"delete_agent": "Delete agent"
|
||||||
},
|
},
|
||||||
"queue": {
|
"queue": {
|
||||||
"queue": "Queue",
|
"queue": "Queue",
|
||||||
|
@ -381,6 +383,8 @@
|
||||||
"task_running": "Task is running",
|
"task_running": "Task is running",
|
||||||
"task_pending": "Task is pending",
|
"task_pending": "Task is pending",
|
||||||
"task_waiting_on_deps": "Task is waiting on dependencies",
|
"task_waiting_on_deps": "Task is waiting on dependencies",
|
||||||
|
"agent": "agent",
|
||||||
|
"waiting_for": "waiting for",
|
||||||
"stats": {
|
"stats": {
|
||||||
"completed_count": "Completed Tasks",
|
"completed_count": "Completed Tasks",
|
||||||
"worker_count": "Free",
|
"worker_count": "Free",
|
||||||
|
|
|
@ -12,10 +12,7 @@
|
||||||
start-icon="back"
|
start-icon="back"
|
||||||
@click="selectedAgent = undefined"
|
@click="selectedAgent = undefined"
|
||||||
/>
|
/>
|
||||||
<template v-else>
|
<Button v-else class="ml-auto" :text="$t('admin.settings.agents.add')" start-icon="plus" @click="showAddAgent" />
|
||||||
<Button class="ml-auto" :text="$t('admin.settings.agents.add')" start-icon="plus" @click="showAddAgent" />
|
|
||||||
<Button class="ml-2" start-icon="refresh" @click="loadAgents" />
|
|
||||||
</template>
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div v-if="!selectedAgent" class="space-y-4 text-color">
|
<div v-if="!selectedAgent" class="space-y-4 text-color">
|
||||||
|
@ -23,15 +20,21 @@
|
||||||
<span>{{ agent.name || `Agent ${agent.id}` }}</span>
|
<span>{{ agent.name || `Agent ${agent.id}` }}</span>
|
||||||
<span class="ml-auto">
|
<span class="ml-auto">
|
||||||
<span class="hidden md:inline-block space-x-2">
|
<span class="hidden md:inline-block space-x-2">
|
||||||
<Badge :label="$t('admin.settings.agents.platform.badge')" :value="agent.platform" />
|
<Badge :label="$t('admin.settings.agents.platform.badge')" :value="agent.platform || '???'" />
|
||||||
<Badge :label="$t('admin.settings.agents.backend.badge')" :value="agent.backend" />
|
<Badge :label="$t('admin.settings.agents.backend.badge')" :value="agent.backend || '???'" />
|
||||||
<Badge :label="$t('admin.settings.agents.capacity.badge')" :value="agent.capacity" />
|
<Badge :label="$t('admin.settings.agents.capacity.badge')" :value="agent.capacity || '???'" />
|
||||||
</span>
|
</span>
|
||||||
<span class="ml-2">{{ agent.last_contact ? timeAgo.format(agent.last_contact * 1000) : 'never' }}</span>
|
<span class="ml-2">{{ agent.last_contact ? timeAgo.format(agent.last_contact * 1000) : 'never' }}</span>
|
||||||
</span>
|
</span>
|
||||||
<IconButton icon="edit" class="ml-2 w-8 h-8" @click="editAgent(agent)" />
|
<IconButton
|
||||||
|
icon="edit"
|
||||||
|
:title="$t('admin.settings.agents.edit_agent')"
|
||||||
|
class="ml-2 w-8 h-8"
|
||||||
|
@click="editAgent(agent)"
|
||||||
|
/>
|
||||||
<IconButton
|
<IconButton
|
||||||
icon="trash"
|
icon="trash"
|
||||||
|
:title="$t('admin.settings.agents.delete_agent')"
|
||||||
class="ml-2 w-8 h-8 hover:text-red-400 hover:dark:text-red-500"
|
class="ml-2 w-8 h-8 hover:text-red-400 hover:dark:text-red-500"
|
||||||
:is-loading="isDeleting"
|
:is-loading="isDeleting"
|
||||||
@click="deleteAgent(agent)"
|
@click="deleteAgent(agent)"
|
||||||
|
@ -114,11 +117,12 @@
|
||||||
|
|
||||||
<script lang="ts" setup>
|
<script lang="ts" setup>
|
||||||
import { cloneDeep } from 'lodash';
|
import { cloneDeep } from 'lodash';
|
||||||
import { computed, onMounted, ref } from 'vue';
|
import { computed, onBeforeUnmount, onMounted, ref } from 'vue';
|
||||||
import { useI18n } from 'vue-i18n';
|
import { useI18n } from 'vue-i18n';
|
||||||
|
|
||||||
import Badge from '~/components/atomic/Badge.vue';
|
import Badge from '~/components/atomic/Badge.vue';
|
||||||
import Button from '~/components/atomic/Button.vue';
|
import Button from '~/components/atomic/Button.vue';
|
||||||
|
import IconButton from '~/components/atomic/IconButton.vue';
|
||||||
import ListItem from '~/components/atomic/ListItem.vue';
|
import ListItem from '~/components/atomic/ListItem.vue';
|
||||||
import Checkbox from '~/components/form/Checkbox.vue';
|
import Checkbox from '~/components/form/Checkbox.vue';
|
||||||
import InputField from '~/components/form/InputField.vue';
|
import InputField from '~/components/form/InputField.vue';
|
||||||
|
@ -132,7 +136,7 @@ import timeAgo from '~/utils/timeAgo';
|
||||||
|
|
||||||
const apiClient = useApiClient();
|
const apiClient = useApiClient();
|
||||||
const notifications = useNotifications();
|
const notifications = useNotifications();
|
||||||
const i18n = useI18n();
|
const { t } = useI18n();
|
||||||
|
|
||||||
const agents = ref<Agent[]>([]);
|
const agents = ref<Agent[]>([]);
|
||||||
const selectedAgent = ref<Partial<Agent>>();
|
const selectedAgent = ref<Partial<Agent>>();
|
||||||
|
@ -154,7 +158,7 @@ const { doSubmit: saveAgent, isLoading: isSaving } = useAsyncAction(async () =>
|
||||||
selectedAgent.value = await apiClient.createAgent(selectedAgent.value);
|
selectedAgent.value = await apiClient.createAgent(selectedAgent.value);
|
||||||
}
|
}
|
||||||
notifications.notify({
|
notifications.notify({
|
||||||
title: i18n.t(isEditingAgent.value ? 'admin.settings.agents.saved' : 'admin.settings.agents.created'),
|
title: t(isEditingAgent.value ? 'admin.settings.agents.saved' : 'admin.settings.agents.created'),
|
||||||
type: 'success',
|
type: 'success',
|
||||||
});
|
});
|
||||||
await loadAgents();
|
await loadAgents();
|
||||||
|
@ -162,12 +166,12 @@ const { doSubmit: saveAgent, isLoading: isSaving } = useAsyncAction(async () =>
|
||||||
|
|
||||||
const { doSubmit: deleteAgent, isLoading: isDeleting } = useAsyncAction(async (_agent: Agent) => {
|
const { doSubmit: deleteAgent, isLoading: isDeleting } = useAsyncAction(async (_agent: Agent) => {
|
||||||
// eslint-disable-next-line no-restricted-globals, no-alert
|
// eslint-disable-next-line no-restricted-globals, no-alert
|
||||||
if (!confirm(i18n.t('admin.settings.agents.delete_confirm'))) {
|
if (!confirm(t('admin.settings.agents.delete_confirm'))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await apiClient.deleteAgent(_agent);
|
await apiClient.deleteAgent(_agent);
|
||||||
notifications.notify({ title: i18n.t('admin.settings.agents.deleted'), type: 'success' });
|
notifications.notify({ title: t('admin.settings.agents.deleted'), type: 'success' });
|
||||||
await loadAgents();
|
await loadAgents();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -179,7 +183,17 @@ function showAddAgent() {
|
||||||
selectedAgent.value = cloneDeep({ name: '' });
|
selectedAgent.value = cloneDeep({ name: '' });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const reloadInterval = ref<number>();
|
||||||
onMounted(async () => {
|
onMounted(async () => {
|
||||||
await loadAgents();
|
await loadAgents();
|
||||||
|
reloadInterval.value = window.setInterval(async () => {
|
||||||
|
await loadAgents();
|
||||||
|
}, 5000);
|
||||||
|
});
|
||||||
|
|
||||||
|
onBeforeUnmount(() => {
|
||||||
|
if (reloadInterval.value) {
|
||||||
|
window.clearInterval(reloadInterval.value);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
</script>
|
</script>
|
||||||
|
|
|
@ -57,15 +57,25 @@
|
||||||
"
|
"
|
||||||
:class="{
|
:class="{
|
||||||
'text-red-400': task.status === 'waiting_on_deps',
|
'text-red-400': task.status === 'waiting_on_deps',
|
||||||
'text-lime-400': task.status === 'running',
|
'text-blue-400': task.status === 'running',
|
||||||
'text-blue-400': task.status === 'pending',
|
'text-gray-400': task.status === 'pending',
|
||||||
}"
|
}"
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
<span class="ml-2">{{ task.id }}</span>
|
<span class="ml-2">{{ task.id }}</span>
|
||||||
<span class="flex ml-auto gap-2">
|
<span class="flex ml-auto gap-2">
|
||||||
<span>{{ task.labels }}</span>
|
<Badge v-if="task.agent_id !== 0" :label="$t('admin.settings.queue.agent')" :value="task.agent_id" />
|
||||||
<span>{{ task.dependencies }}</span>
|
<Badge
|
||||||
|
v-for="(value, label) in task.labels"
|
||||||
|
:key="label"
|
||||||
|
:label="label.toString()"
|
||||||
|
:value="value || '???'"
|
||||||
|
/>
|
||||||
|
<Badge
|
||||||
|
v-if="task.dependencies"
|
||||||
|
:label="$t('admin.settings.queue.waiting_for')"
|
||||||
|
:value="task.dependencies.join(', ')"
|
||||||
|
/>
|
||||||
</span>
|
</span>
|
||||||
</ListItem>
|
</ListItem>
|
||||||
</div>
|
</div>
|
||||||
|
@ -77,6 +87,7 @@
|
||||||
import { computed, onBeforeUnmount, onMounted, ref } from 'vue';
|
import { computed, onBeforeUnmount, onMounted, ref } from 'vue';
|
||||||
import { useI18n } from 'vue-i18n';
|
import { useI18n } from 'vue-i18n';
|
||||||
|
|
||||||
|
import Badge from '~/components/atomic/Badge.vue';
|
||||||
import Button from '~/components/atomic/Button.vue';
|
import Button from '~/components/atomic/Button.vue';
|
||||||
import Icon from '~/components/atomic/Icon.vue';
|
import Icon from '~/components/atomic/Icon.vue';
|
||||||
import ListItem from '~/components/atomic/ListItem.vue';
|
import ListItem from '~/components/atomic/ListItem.vue';
|
||||||
|
|
|
@ -81,28 +81,28 @@ const data = computed(() => {
|
||||||
label: t('admin.settings.queue.stats.worker_count'),
|
label: t('admin.settings.queue.stats.worker_count'),
|
||||||
value: props.stats.worker_count,
|
value: props.stats.worker_count,
|
||||||
perc: total.value > 0 ? (props.stats.worker_count / total.value) * 100 : 0,
|
perc: total.value > 0 ? (props.stats.worker_count / total.value) * 100 : 0,
|
||||||
color: 'bg-green-500',
|
color: 'bg-lime-400',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
key: 'running_count',
|
key: 'running_count',
|
||||||
label: t('admin.settings.queue.stats.running_count'),
|
label: t('admin.settings.queue.stats.running_count'),
|
||||||
value: props.stats.running_count,
|
value: props.stats.running_count,
|
||||||
perc: total.value > 0 ? (props.stats.running_count / total.value) * 100 : 100,
|
perc: total.value > 0 ? (props.stats.running_count / total.value) * 100 : 100,
|
||||||
color: 'bg-blue-500',
|
color: 'bg-blue-400',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
key: 'pending_count',
|
key: 'pending_count',
|
||||||
label: t('admin.settings.queue.stats.pending_count'),
|
label: t('admin.settings.queue.stats.pending_count'),
|
||||||
value: props.stats.pending_count,
|
value: props.stats.pending_count,
|
||||||
perc: total.value > 0 ? (props.stats.pending_count / total.value) * 100 : 0,
|
perc: total.value > 0 ? (props.stats.pending_count / total.value) * 100 : 0,
|
||||||
color: 'bg-red-500',
|
color: 'bg-gray-400',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
key: 'waiting_on_deps_count',
|
key: 'waiting_on_deps_count',
|
||||||
label: t('admin.settings.queue.stats.waiting_on_deps_count'),
|
label: t('admin.settings.queue.stats.waiting_on_deps_count'),
|
||||||
value: props.stats.waiting_on_deps_count,
|
value: props.stats.waiting_on_deps_count,
|
||||||
perc: total.value > 0 ? (props.stats.waiting_on_deps_count / total.value) * 100 : 0,
|
perc: total.value > 0 ? (props.stats.waiting_on_deps_count / total.value) * 100 : 0,
|
||||||
color: 'bg-red-800',
|
color: 'bg-red-400',
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
});
|
});
|
||||||
|
|
|
@ -3,12 +3,15 @@
|
||||||
<span
|
<span
|
||||||
class="pl-2 pr-1 py-0.5 bg-gray-800 text-gray-200 dark:bg-gray-600 border-2 border-gray-800 dark:border-gray-600 rounded-l-full"
|
class="pl-2 pr-1 py-0.5 bg-gray-800 text-gray-200 dark:bg-gray-600 border-2 border-gray-800 dark:border-gray-600 rounded-l-full"
|
||||||
:class="{
|
:class="{
|
||||||
'rounded-r-full pr-2': !value,
|
'rounded-r-full pr-2': value === undefined,
|
||||||
}"
|
}"
|
||||||
>
|
>
|
||||||
{{ label }}
|
{{ label }}
|
||||||
</span>
|
</span>
|
||||||
<span v-if="value" class="pl-1 pr-2 py-0.5 border-2 border-gray-800 dark:border-gray-600 rounded-r-full">
|
<span
|
||||||
|
v-if="value !== undefined"
|
||||||
|
class="pl-1 pr-2 py-0.5 border-2 border-gray-800 dark:border-gray-600 rounded-r-full"
|
||||||
|
>
|
||||||
{{ value }}
|
{{ value }}
|
||||||
</span>
|
</span>
|
||||||
</span>
|
</span>
|
||||||
|
|
|
@ -5,6 +5,7 @@ export type Task = {
|
||||||
dependencies: string[];
|
dependencies: string[];
|
||||||
dep_status: { [key: string]: string };
|
dep_status: { [key: string]: string };
|
||||||
run_on: string[];
|
run_on: string[];
|
||||||
|
agent_id: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type QueueStats = {
|
export type QueueStats = {
|
||||||
|
|
Loading…
Reference in a new issue