Fifo queue with dependencies

This commit is contained in:
Laszlo Fogas 2019-06-13 17:38:19 +02:00
parent 71f2486ae0
commit a3ec40d438
8 changed files with 224 additions and 45 deletions

View file

@ -7,6 +7,8 @@ import (
"runtime"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
type entry struct {
@ -50,6 +52,19 @@ func (q *fifo) Push(c context.Context, task *Task) error {
return nil
}
// Push pushes an item to the tail of this queue.
func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error {
q.Lock()
for _, task := range tasks {
q.pending.PushBack(task)
}
q.Unlock()
for range tasks {
go q.process()
}
return nil
}
// Poll retrieves and removes the head of this queue.
func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
q.Lock()
@ -187,21 +202,48 @@ func (q *fifo) process() {
loop:
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
item := e.Value.(*Task)
task := e.Value.(*Task)
logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
if q.depsInQueue(task) {
continue
}
for w := range q.workers {
if w.filter(item) {
if w.filter(task) {
delete(q.workers, w)
q.pending.Remove(e)
q.running[item.ID] = &entry{
item: item,
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
w.channel <- item
logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies)
w.channel <- task
break loop
}
}
}
}
func (q *fifo) depsInQueue(task *Task) bool {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
possibleDep, ok := e.Value.(*Task)
logrus.Debugf("queue: in queue right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies {
if ok && possibleDep.ID == dep {
return true
}
}
}
for possibleDepID := range q.running {
for _, dep := range task.Dependencies {
if possibleDepID == dep {
return true
}
}
}
return false
}

View file

@ -117,3 +117,30 @@ func TestFifoEvict(t *testing.T) {
t.Errorf("expect not found error when evicting item not in queue, got %s", err)
}
}
func TestFifoDependencies(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task1)
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
}

View file

@ -23,6 +23,9 @@ type Task struct {
// Labels represents the key-value pairs the entry is lebeled with.
Labels map[string]string `json:"labels,omitempty"`
// Task IDs this task depend on
Dependencies []string
}
// InfoT provides runtime information.
@ -44,9 +47,12 @@ type Filter func(*Task) bool
// Queue defines a task queue for scheduling tasks among
// a pool of workers.
type Queue interface {
// Push pushes an task to the tail of this queue.
// Push pushes a task to the tail of this queue.
Push(c context.Context, task *Task) error
// Push pushes a task to the tail of this queue.
PushAtOnce(c context.Context, tasks []*Task) error
// Poll retrieves and removes a task head of this queue.
Poll(c context.Context, f Filter) (*Task, error)

View file

@ -54,7 +54,7 @@ type persistentQueue struct {
store TaskStore
}
// Push pushes an task to the tail of this queue.
// Push pushes a task to the tail of this queue.
func (q *persistentQueue) Push(c context.Context, task *queue.Task) error {
q.store.TaskInsert(&Task{
ID: task.ID,
@ -68,6 +68,24 @@ func (q *persistentQueue) Push(c context.Context, task *queue.Task) error {
return err
}
// Push pushes multiple tasks to the tail of this queue.
func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*queue.Task) error {
for _, task := range tasks {
q.store.TaskInsert(&Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
})
}
err := q.Queue.PushAtOnce(c, tasks)
if err != nil {
for _, task := range tasks {
q.store.TaskDelete(task.ID)
}
}
return err
}
// Poll retrieves and removes a task head of this queue.
func (q *persistentQueue) Poll(c context.Context, f queue.Filter) (*queue.Task, error) {
task, err := q.Queue.Poll(c, f)

View file

@ -315,9 +315,9 @@ func PostApproval(c *gin.Context) {
}
}()
var yamls []string
var yamls []*remote.FileMeta
for _, y := range configs {
yamls = append(yamls, string(y.Data))
yamls = append(yamls, &remote.FileMeta{Data: []byte(y.Data), Name: y.Name})
}
b := procBuilder{
@ -478,6 +478,13 @@ func PostBuild(c *gin.Context) {
return
}
err = persistBuildConfigs(configs, build.ID)
if err != nil {
logrus.Errorf("failure to persist build config for %s. %s", repo.FullName, err)
c.AbortWithError(500, err)
return
}
// Read query string parameters into buildParams, exclude reserved params
var buildParams = map[string]string{}
for key, val := range c.Request.URL.Query() {
@ -508,9 +515,9 @@ func PostBuild(c *gin.Context) {
}
}
var yamls []string
var yamls []*remote.FileMeta
for _, y := range configs {
yamls = append(yamls, string(y.Data))
yamls = append(yamls, &remote.FileMeta{Data: []byte(y.Data), Name: y.Name})
}
b := procBuilder{
@ -589,6 +596,20 @@ func DeleteBuildLogs(c *gin.Context) {
c.String(204, "")
}
func persistBuildConfigs(configs []*model.Config, buildID int64) error {
for _, conf := range configs {
buildConfig := &model.BuildConfig{
ConfigID: conf.ID,
BuildID: buildID,
}
err := Config.Storage.Config.BuildConfigCreate(buildConfig)
if err != nil {
return err
}
}
return nil
}
var deleteStr = `[
{
"proc": %q,

View file

@ -180,8 +180,7 @@ func PostHook(c *gin.Context) {
// persist the build config for historical correctness, restarts, etc
for _, remoteYamlConfig := range remoteYamlConfigs {
conf, err := findOrPersistPipelineConfig(build, remoteYamlConfig.Data)
fmt.Println(conf)
_, err := findOrPersistPipelineConfig(build, remoteYamlConfig)
if err != nil {
logrus.Errorf("failure to find or persist build config for %s. %s", repo.FullName, err)
c.AbortWithError(500, err)
@ -230,11 +229,6 @@ func PostHook(c *gin.Context) {
}
}()
var yamls []string
for _, y := range remoteYamlConfigs {
yamls = append(yamls, string(y.Data))
}
b := procBuilder{
Repo: repo,
Curr: build,
@ -244,7 +238,7 @@ func PostHook(c *gin.Context) {
Regs: regs,
Envs: envs,
Link: httputil.GetURL(c.Request),
Yamls: yamls,
Yamls: remoteYamlConfigs,
}
buildItems, err := b.Build()
if err != nil {
@ -265,14 +259,15 @@ func PostHook(c *gin.Context) {
queueBuild(build, repo, buildItems)
}
func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig []byte) (*model.Config, error) {
sha := shasum(remoteYamlConfig)
func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig *remote.FileMeta) (*model.Config, error) {
sha := shasum(remoteYamlConfig.Data)
conf, err := Config.Storage.Config.ConfigFindIdentical(build.RepoID, sha)
if err != nil {
conf = &model.Config{
RepoID: build.RepoID,
Data: string(remoteYamlConfig),
Data: string(remoteYamlConfig.Data),
Hash: sha,
Name: sanitizePath(remoteYamlConfig.Name),
}
err = Config.Storage.Config.ConfigCreate(conf)
if err != nil {
@ -296,6 +291,7 @@ func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig []byte) (*
return conf, nil
}
// publishes message to UI clients
func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) {
message := pubsub.Message{
Labels: map[string]string{
@ -314,6 +310,7 @@ func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) {
}
func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) {
var tasks []*queue.Task
for _, item := range buildItems {
task := new(queue.Task)
task.ID = fmt.Sprint(item.Proc.ID)
@ -323,6 +320,7 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) {
}
task.Labels["platform"] = item.Platform
task.Labels["repo"] = repo.FullName
task.Dependencies = taskIds(item.DependsOn, buildItems)
task.Data, _ = json.Marshal(rpc.Pipeline{
ID: fmt.Sprint(item.Proc.ID),
@ -331,8 +329,21 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) {
})
Config.Services.Logs.Open(context.Background(), task.ID)
Config.Services.Queue.Push(context.Background(), task)
tasks = append(tasks, task)
}
Config.Services.Queue.PushAtOnce(context.Background(), tasks)
}
func taskIds(dependsOn []string, buildItems []*buildItem) []string {
taskIds := []string{}
for _, dep := range dependsOn {
for _, buildItem := range buildItems {
if buildItem.Proc.Name == dep {
taskIds = append(taskIds, fmt.Sprint(buildItem.Proc.ID))
}
}
}
return taskIds
}
func shasum(raw []byte) string {

View file

@ -28,6 +28,7 @@ import (
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/frontend/yaml/linter"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/frontend/yaml/matrix"
"github.com/laszlocph/drone-oss-08/model"
"github.com/laszlocph/drone-oss-08/remote"
)
// Takes the hook data and the yaml and returns in internal data model
@ -39,15 +40,16 @@ type procBuilder struct {
Secs []*model.Secret
Regs []*model.Registry
Link string
Yamls []string
Yamls []*remote.FileMeta
Envs map[string]string
}
type buildItem struct {
Proc *model.Proc
Platform string
Labels map[string]string
Config *backend.Config
Proc *model.Proc
Platform string
Labels map[string]string
DependsOn []string
Config *backend.Config
}
func (b *procBuilder) Build() ([]*buildItem, error) {
@ -55,7 +57,7 @@ func (b *procBuilder) Build() ([]*buildItem, error) {
for j, y := range b.Yamls {
// matrix axes
axes, err := matrix.ParseString(y)
axes, err := matrix.ParseString(string(y.Data))
if err != nil {
return nil, err
}
@ -70,6 +72,7 @@ func (b *procBuilder) Build() ([]*buildItem, error) {
PGID: j + i + 1,
State: model.StatusPending,
Environ: axis,
Name: sanitizePath(y.Name),
}
b.Curr.Procs = append(b.Curr.Procs, proc)
@ -77,13 +80,13 @@ func (b *procBuilder) Build() ([]*buildItem, error) {
environ := b.environmentVariables(metadata, axis)
// substitute vars
y, err := b.envsubst_(y, environ)
substituted, err := b.envsubst_(string(y.Data), environ)
if err != nil {
return nil, err
}
// parse yaml pipeline
parsed, err := yaml.ParseString(y)
parsed, err := yaml.ParseString(substituted)
if err != nil {
return nil, err
}
@ -101,10 +104,11 @@ func (b *procBuilder) Build() ([]*buildItem, error) {
ir := b.toInternalRepresentation(parsed, environ, metadata, proc.ID)
item := &buildItem{
Proc: proc,
Config: ir,
Labels: parsed.Labels,
Platform: metadata.Sys.Arch,
Proc: proc,
Config: ir,
Labels: parsed.Labels,
DependsOn: parsed.DependsOn,
Platform: metadata.Sys.Arch,
}
if item.Labels == nil {
item.Labels = map[string]string{}
@ -289,3 +293,10 @@ func metadataFromStruct(repo *model.Repo, build, last *model.Build, proc *model.
},
}
}
func sanitizePath(path string) string {
path = strings.TrimSuffix(path, ".yml")
path = strings.TrimPrefix(path, ".drone/")
path = strings.TrimPrefix(path, ".")
return path
}

View file

@ -19,6 +19,7 @@ import (
"testing"
"github.com/laszlocph/drone-oss-08/model"
"github.com/laszlocph/drone-oss-08/remote"
)
func TestMultilineEnvsubst(t *testing.T) {
@ -33,17 +34,20 @@ bbb`,
Secs: []*model.Secret{},
Regs: []*model.Registry{},
Link: "",
Yamls: []string{`pipeline:
Yamls: []*remote.FileMeta{
&remote.FileMeta{Data: []byte(`
pipeline:
xxx:
image: scratch
yyy: ${DRONE_COMMIT_MESSAGE}
`, `pipeline:
`)},
&remote.FileMeta{Data: []byte(`
pipeline:
build:
image: scratch
yyy: ${DRONE_COMMIT_MESSAGE}
`,
},
}
`)},
}}
if buildItems, err := b.Build(); err != nil {
t.Fatal(err)
@ -61,15 +65,19 @@ func TestMultiPipeline(t *testing.T) {
Secs: []*model.Secret{},
Regs: []*model.Registry{},
Link: "",
Yamls: []string{`pipeline:
lint:
Yamls: []*remote.FileMeta{
&remote.FileMeta{Data: []byte(`
pipeline:
xxx:
image: scratch
yyy: ${DRONE_COMMIT_MESSAGE}
`, `pipeline:
test:
`)},
&remote.FileMeta{Data: []byte(`
pipeline:
build:
image: scratch
yyy: ${DRONE_COMMIT_MESSAGE}
`,
`)},
},
}
@ -81,3 +89,38 @@ func TestMultiPipeline(t *testing.T) {
t.Fatal("Should have generated 2 buildItems")
}
}
func TestDependsOn(t *testing.T) {
b := procBuilder{
Repo: &model.Repo{},
Curr: &model.Build{},
Last: &model.Build{},
Netrc: &model.Netrc{},
Secs: []*model.Secret{},
Regs: []*model.Registry{},
Link: "",
Yamls: []*remote.FileMeta{
&remote.FileMeta{Data: []byte(`
pipeline:
deploy:
image: scratch
depends_on:
- lint
- test
- build
`)},
},
}
buildItems, err := b.Build()
if err != nil {
t.Fatal(err)
}
if len(buildItems[0].DependsOn) != 3 {
t.Fatal("Should have 3 dependencies")
}
if buildItems[0].DependsOn[1] != "test" {
t.Fatal("Should depend on test")
}
}