hacky fixed for race conditions on cancel

This commit is contained in:
Brad Rydzewski 2016-07-13 17:33:28 -07:00
parent c27a5dd0b4
commit 71de0d9408
2 changed files with 49 additions and 5 deletions

View file

@ -3,6 +3,7 @@ package build
import ( import (
"bufio" "bufio"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@ -20,6 +21,7 @@ type Pipeline struct {
conf *yaml.Config conf *yaml.Config
head *element head *element
tail *element tail *element
wait sync.WaitGroup
pipe chan (*Line) pipe chan (*Line)
next chan (error) next chan (error)
done chan (error) done chan (error)
@ -87,6 +89,11 @@ func (p *Pipeline) Tail() *yaml.Container {
// Stop stops the pipeline. // Stop stops the pipeline.
func (p *Pipeline) Stop() { func (p *Pipeline) Stop() {
go func() { go func() {
defer func() {
if r := recover(); r != nil {
logrus.Errorln("recover stopping the pipeline", r)
}
}()
p.done <- ErrTerm p.done <- ErrTerm
}() }()
} }
@ -98,9 +105,11 @@ func (p *Pipeline) Setup() error {
// Teardown removes the pipeline environment. // Teardown removes the pipeline environment.
func (p *Pipeline) Teardown() { func (p *Pipeline) Teardown() {
for _, id := range p.containers { for _, id := range p.containers {
p.engine.ContainerRemove(id) p.engine.ContainerRemove(id)
} }
close(p.next) close(p.next)
close(p.done) close(p.done)
@ -114,10 +123,32 @@ func (p *Pipeline) Teardown() {
func (p *Pipeline) step() { func (p *Pipeline) step() {
if p.head == p.tail { if p.head == p.tail {
go func() { go func() {
defer func() {
if r := recover(); r != nil {
logrus.Errorln("recover executing step function", r)
}
}()
// stop all containers
for _, id := range p.containers {
p.engine.ContainerStop(id)
}
// wait for all logs to terminate
// p.wait.Done() // this is for the ambassador
p.wait.Wait()
// signal completion
p.done <- nil p.done <- nil
}() }()
} else { } else {
go func() { go func() {
defer func() {
if r := recover(); r != nil {
logrus.Errorln("recover executing step to head function", r)
}
}()
p.head = p.head.next p.head = p.head.next
p.next <- nil p.next <- nil
}() }()
@ -137,17 +168,23 @@ func (p *Pipeline) close(err error) {
} }
func (p *Pipeline) exec(c *yaml.Container) error { func (p *Pipeline) exec(c *yaml.Container) error {
name, err := p.engine.ContainerStart(c) name, err := p.engine.ContainerStart(c)
if err != nil { if err != nil {
return err return err
} }
p.containers = append(p.containers, name) p.containers = append(p.containers, name)
logrus.Debugf("wait.add(1) for %s logs", name)
p.wait.Add(1)
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
logrus.Errorln("recover writing build output", r) logrus.Errorln("recover writing build output", r)
} }
logrus.Debugf("wait.done() for %s logs", name)
p.wait.Done()
}() }()
rc, rerr := p.engine.ContainerLogs(name) rc, rerr := p.engine.ContainerLogs(name)
@ -179,17 +216,16 @@ func (p *Pipeline) exec(c *yaml.Container) error {
if err != nil { if err != nil {
return err return err
} }
if state.OOMKilled {
return &OomError{c.Name}
} else if state.ExitCode != 0 {
return &ExitError{c.Name, state.ExitCode}
}
logrus.Debugf("wait.add(1) for %s exit code", name)
p.wait.Add(1)
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
logrus.Errorln("recover writing exit code to output", r) logrus.Errorln("recover writing exit code to output", r)
} }
p.wait.Done()
logrus.Debugf("wait.done() for %s exit code", name)
}() }()
p.pipe <- &Line{ p.pipe <- &Line{
@ -198,5 +234,12 @@ func (p *Pipeline) exec(c *yaml.Container) error {
Out: strconv.Itoa(state.ExitCode), Out: strconv.Itoa(state.ExitCode),
} }
}() }()
if state.OOMKilled {
return &OomError{c.Name}
} else if state.ExitCode != 0 {
return &ExitError{c.Name, state.ExitCode}
}
return nil return nil
} }

View file

@ -255,6 +255,7 @@ func PostBuild(c *gin.Context) {
build.Finished = 0 build.Finished = 0
build.Enqueued = time.Now().UTC().Unix() build.Enqueued = time.Now().UTC().Unix()
for _, job := range jobs { for _, job := range jobs {
job.Error = ""
job.Status = model.StatusPending job.Status = model.StatusPending
job.Started = 0 job.Started = 0
job.Finished = 0 job.Finished = 0