woodpecker/server/queue/queue.go

173 lines
4 KiB
Go
Raw Normal View History

2017-03-05 07:56:08 +00:00
package queue
import (
"context"
"errors"
"fmt"
"strings"
2017-03-05 07:56:08 +00:00
)
var (
// ErrCancel indicates the task was cancelled.
ErrCancel = errors.New("queue: task cancelled")
// ErrNotFound indicates the task was not found in the queue.
ErrNotFound = errors.New("queue: task not found")
)
// Task defines a unit of work in the queue.
type Task struct {
// ID identifies this task.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Labels represents the key-value pairs the entry is lebeled with.
Labels map[string]string `json:"labels,omitempty"`
2019-06-13 15:38:19 +00:00
// Task IDs this task depend
2019-06-13 15:38:19 +00:00
Dependencies []string
// Dependency's exit status
DepStatus map[string]string
// RunOn failure or success
RunOn []string
}
// ShouldRun tells if a task should be run or skipped, based on dependencies
func (t *Task) ShouldRun() bool {
if runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) {
return true
}
if !runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) {
for _, status := range t.DepStatus {
if StatusSuccess != status {
return false
}
}
return true
}
if runsOnFailure(t.RunOn) && !runsOnSuccess(t.RunOn) {
for _, status := range t.DepStatus {
if StatusSuccess == status {
return false
}
}
return true
}
return false
}
func (t *Task) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus))
return sb.String()
}
func runsOnFailure(runsOn []string) bool {
for _, status := range runsOn {
if status == "failure" {
return true
}
}
return false
2017-03-05 07:56:08 +00:00
}
func runsOnSuccess(runsOn []string) bool {
if len(runsOn) == 0 {
return true
}
for _, status := range runsOn {
if status == "success" {
return true
}
}
return false
}
2017-03-05 07:56:08 +00:00
// InfoT provides runtime information.
type InfoT struct {
Pending []*Task `json:"pending"`
WaitingOnDeps []*Task `json:"waiting_on_deps"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
2017-03-05 07:56:08 +00:00
} `json:"stats"`
2019-06-28 06:29:57 +00:00
Paused bool
2017-03-05 07:56:08 +00:00
}
func (t *InfoT) String() string {
var sb strings.Builder
for _, task := range t.Pending {
sb.WriteString("\t" + task.String())
}
for _, task := range t.Running {
sb.WriteString("\t" + task.String())
}
for _, task := range t.WaitingOnDeps {
sb.WriteString("\t" + task.String())
}
return sb.String()
}
2017-03-05 07:56:08 +00:00
// Filter filters tasks in the queue. If the Filter returns false,
// the Task is skipped and not returned to the subscriber.
type Filter func(*Task) bool
// Queue defines a task queue for scheduling tasks among
// a pool of workers.
type Queue interface {
2019-06-13 15:38:19 +00:00
// Push pushes a task to the tail of this queue.
2017-03-05 07:56:08 +00:00
Push(c context.Context, task *Task) error
// PushAtOnce pushes a task to the tail of this queue.
2019-06-13 15:38:19 +00:00
PushAtOnce(c context.Context, tasks []*Task) error
2017-03-05 07:56:08 +00:00
// Poll retrieves and removes a task head of this queue.
Poll(c context.Context, f Filter) (*Task, error)
// Extend extends the deadline for a task.
Extend(c context.Context, id string) error
// Done signals the task is complete.
Done(c context.Context, exitStatus string, id string) error
2017-03-05 07:56:08 +00:00
// Error signals the task is complete with errors.
Error(c context.Context, id string, err error) error
// ErrorAtOnce signals the task is complete with errors.
2019-09-16 13:18:15 +00:00
ErrorAtOnce(c context.Context, id []string, err error) error
2017-04-06 16:04:25 +00:00
// Evict removes a pending task from the queue.
Evict(c context.Context, id string) error
// EvictAtOnce removes a pending task from the queue.
2019-09-16 13:18:15 +00:00
EvictAtOnce(c context.Context, id []string) error
2017-03-05 07:56:08 +00:00
// Wait waits until the task is complete.
Wait(c context.Context, id string) error
// Info returns internal queue information.
Info(c context.Context) InfoT
2019-06-28 06:29:57 +00:00
// Pause stops the queue from handing out new work items in Poll
2019-06-28 06:29:57 +00:00
Pause()
// Resume starts the queue again, Poll returns new items
2019-06-28 06:29:57 +00:00
Resume()
2017-03-05 07:56:08 +00:00
}