From 62ccf1bd604a74c889208185656f702c08835433 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 18 Apr 2016 12:07:01 -0700 Subject: [PATCH] added 0.5 yaml and runtime support through feature flag --- api/build.go | 31 +++ engine/compiler/compile.go | 4 +- engine/compiler/compile_test.go | 2 +- engine/compiler/transform.go | 2 +- engine/engine/engine.go | 158 +++++++++++ engine/runner/docker/context.go | 24 ++ engine/runner/docker/docker.go | 111 ++++++++ engine/runner/docker/docker_test.go | 1 + engine/runner/docker/helper.go | 49 ++++ engine/runner/docker/helper_test.go | 1 + engine/runner/docker/internal/README | 1 + engine/runner/docker/internal/stdcopy.go | 167 +++++++++++ engine/runner/docker/internal/stdcopy_test.go | 260 ++++++++++++++++++ engine/runner/docker/util.go | 102 +++++++ engine/runner/docker/util_test.go | 24 ++ engine/runner/runner.go | 6 +- web/hook.go | 31 ++- web/stream2.go | 126 +++++++++ 18 files changed, 1091 insertions(+), 9 deletions(-) create mode 100644 engine/engine/engine.go create mode 100644 engine/runner/docker/context.go create mode 100644 engine/runner/docker/docker.go create mode 100644 engine/runner/docker/docker_test.go create mode 100644 engine/runner/docker/helper.go create mode 100644 engine/runner/docker/helper_test.go create mode 100644 engine/runner/docker/internal/README create mode 100644 engine/runner/docker/internal/stdcopy.go create mode 100644 engine/runner/docker/internal/stdcopy_test.go create mode 100644 engine/runner/docker/util.go create mode 100644 engine/runner/docker/util_test.go create mode 100644 web/stream2.go diff --git a/api/build.go b/api/build.go index 75ec3730b..496662a1f 100644 --- a/api/build.go +++ b/api/build.go @@ -12,6 +12,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/drone/drone/engine" + "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" @@ -280,6 +281,36 @@ func PostBuild(c *gin.Context) { // on status change notifications last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) + + // IMPORTANT. PLEASE READ + // + // The below code uses a feature flag to switch between the current + // build engine and the exerimental 0.5 build engine. This can be + // enabled using with the environment variable CANARY=true + + if os.Getenv("CANARY") == "true" { + for _, job := range jobs { + queue.Publish(c, &queue.Work{ + User: user, + Repo: repo, + Build: build, + BuildLast: last, + Job: job, + Keys: key, + Netrc: netrc, + Yaml: string(raw), + YamlEnc: string(sec), + System: &model.System{ + Link: httputil.GetURL(c.Request), + Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "), + Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), + Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), + }, + }) + } + return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW + } + engine_ := engine.FromContext(c) go engine_.Schedule(c.Copy(), &engine.Task{ User: user, diff --git a/engine/compiler/compile.go b/engine/compiler/compile.go index 2cfe7de4b..da4e56364 100644 --- a/engine/compiler/compile.go +++ b/engine/compiler/compile.go @@ -1,4 +1,4 @@ -package libyaml +package compiler import ( "github.com/drone/drone/engine/runner" @@ -143,4 +143,4 @@ func (c *Compiler) walk(node yaml.Node) (err error) { } } return err -} \ No newline at end of file +} diff --git a/engine/compiler/compile_test.go b/engine/compiler/compile_test.go index 0160d811c..a20d4fea6 100644 --- a/engine/compiler/compile_test.go +++ b/engine/compiler/compile_test.go @@ -1 +1 @@ -package libyaml +package compiler diff --git a/engine/compiler/transform.go b/engine/compiler/transform.go index 48f4d05d8..a61087e58 100644 --- a/engine/compiler/transform.go +++ b/engine/compiler/transform.go @@ -1,4 +1,4 @@ -package libyaml +package compiler import "github.com/drone/drone/engine/compiler/parse" diff --git a/engine/engine/engine.go b/engine/engine/engine.go new file mode 100644 index 000000000..072a3a4cd --- /dev/null +++ b/engine/engine/engine.go @@ -0,0 +1,158 @@ +package engine + +import ( + "fmt" + "time" + + "github.com/Sirupsen/logrus" + "github.com/drone/drone/bus" + "github.com/drone/drone/engine/compiler" + "github.com/drone/drone/engine/runner" + "github.com/drone/drone/engine/runner/docker" + "github.com/drone/drone/model" + "github.com/drone/drone/queue" + "github.com/drone/drone/store" + "github.com/drone/drone/stream" + "golang.org/x/net/context" +) + +// Poll polls the build queue for build jobs. +func Poll(c context.Context) { + for { + pollRecover(c) + } +} + +func pollRecover(c context.Context) { + defer recover() + poll(c) +} + +func poll(c context.Context) { + w := queue.Pull(c) + + logrus.Infof("Starting build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + + rc, wc, err := stream.Create(c, stream.ToKey(w.Job.ID)) + if err != nil { + logrus.Errorf("Error opening build stream %s/%s#%d.%d. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + } + + defer func() { + wc.Close() + rc.Close() + stream.Remove(c, stream.ToKey(w.Job.ID)) + }() + + w.Job.Status = model.StatusRunning + w.Job.Started = time.Now().Unix() + + quitc := make(chan bool, 1) + eventc := make(chan *bus.Event, 1) + bus.Subscribe(c, eventc) + + compile := compiler.New() + compile.Transforms(nil) + spec, err := compile.CompileString(w.Yaml) + if err != nil { + // TODO handle error + logrus.Infof("Error compiling Yaml %s/%s#%d %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error()) + return + } + + defer func() { + bus.Unsubscribe(c, eventc) + quitc <- true + }() + + ctx := context.TODO() + ctx, cancel := context.WithCancel(ctx) + + // TODO store the started build in the database + // TODO publish the started build + store.UpdateJob(c, w.Job) + //store.Write(c, w.Job, rc) + bus.Publish(c, bus.NewEvent(bus.Started, w.Repo, w.Build, w.Job)) + + conf := runner.Config{ + Engine: docker.FromContext(c), + } + + run := conf.Runner(ctx, spec) + run.Run() + defer cancel() + + go func() { + for { + select { + case event := <-eventc: + if event.Type == bus.Cancelled && event.Job.ID == w.Job.ID { + logrus.Infof("Cancel build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + cancel() + } + case <-quitc: + return + } + } + }() + + pipe := run.Pipe() + for { + line := pipe.Next() + if line == nil { + break + } + fmt.Println(line) + } + + err = run.Wait() + + // catch the build result + if err != nil { + w.Job.ExitCode = 255 + } + if exitErr, ok := err.(*runner.ExitError); ok { + w.Job.ExitCode = exitErr.Code + } + + w.Job.Finished = time.Now().Unix() + + switch w.Job.ExitCode { + case 128, 130: + w.Job.Status = model.StatusKilled + case 0: + w.Job.Status = model.StatusSuccess + default: + w.Job.Status = model.StatusFailure + } + + // store the finished build in the database + logs, _, err := stream.Open(c, stream.ToKey(w.Job.ID)) + if err != nil { + logrus.Errorf("Error reading build stream %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + } + defer func() { + if logs != nil { + logs.Close() + } + }() + if err := store.WriteLog(c, w.Job, logs); err != nil { + logrus.Errorf("Error persisting build stream %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + } + if logs != nil { + logs.Close() + } + + // TODO publish the finished build + store.UpdateJob(c, w.Job) + bus.Publish(c, bus.NewEvent(bus.Finished, w.Repo, w.Build, w.Job)) + + logrus.Infof("Finished build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) +} diff --git a/engine/runner/docker/context.go b/engine/runner/docker/context.go new file mode 100644 index 000000000..e19ef84b2 --- /dev/null +++ b/engine/runner/docker/context.go @@ -0,0 +1,24 @@ +package docker + +import ( + "github.com/drone/drone/engine/runner" + "golang.org/x/net/context" +) + +const key = "docker" + +// Setter defines a context that enables setting values. +type Setter interface { + Set(string, interface{}) +} + +// FromContext returns the Engine associated with this context. +func FromContext(c context.Context) runner.Engine { + return c.Value(key).(runner.Engine) +} + +// ToContext adds the Engine to this context if it supports the +// Setter interface. +func ToContext(c Setter, d runner.Engine) { + c.Set(key, d) +} diff --git a/engine/runner/docker/docker.go b/engine/runner/docker/docker.go new file mode 100644 index 000000000..9e1bb4606 --- /dev/null +++ b/engine/runner/docker/docker.go @@ -0,0 +1,111 @@ +package docker + +import ( + "io" + + "github.com/drone/drone/engine/runner" + "github.com/drone/drone/engine/runner/docker/internal" + + "github.com/samalba/dockerclient" +) + +type dockerEngine struct { + client dockerclient.Client +} + +func (e *dockerEngine) ContainerStart(container *runner.Container) (string, error) { + conf := toContainerConfig(container) + auth := toAuthConfig(container) + + // pull the image if it does not exists or if the Container + // is configured to always pull a new image. + _, err := e.client.InspectImage(container.Image) + if err != nil || container.Pull { + e.client.PullImage(container.Image, auth) + } + + // create and start the container and return the Container ID. + id, err := e.client.CreateContainer(conf, container.Name, auth) + if err != nil { + return id, err + } + err = e.client.StartContainer(id, &conf.HostConfig) + if err != nil { + + // remove the container if it cannot be started + e.client.RemoveContainer(id, true, true) + return id, err + } + return id, nil +} + +func (e *dockerEngine) ContainerStop(id string) error { + e.client.StopContainer(id, 1) + e.client.KillContainer(id, "9") + return nil +} + +func (e *dockerEngine) ContainerRemove(id string) error { + e.client.StopContainer(id, 1) + e.client.KillContainer(id, "9") + e.client.RemoveContainer(id, true, true) + return nil +} + +func (e *dockerEngine) ContainerWait(id string) (*runner.State, error) { + // wait for the container to exit + // + // TODO(bradrydzewski) we should have a for loop here + // to re-connect and wait if this channel returns a + // result even though the container is still running. + // + <-e.client.Wait(id) + v, err := e.client.InspectContainer(id) + if err != nil { + return nil, err + } + return &runner.State{ + ExitCode: v.State.ExitCode, + OOMKilled: v.State.OOMKilled, + }, nil +} + +func (e *dockerEngine) ContainerLogs(id string) (io.ReadCloser, error) { + opts := &dockerclient.LogOptions{ + Follow: true, + Stdout: true, + Stderr: true, + } + + piper, pipew := io.Pipe() + go func() { + defer pipew.Close() + + // sometimes the docker logs fails due to parsing errors. this + // routine will check for such a failure and attempt to resume + // if necessary. + for i := 0; i < 5; i++ { + if i > 0 { + opts.Tail = 1 + } + + rc, err := e.client.ContainerLogs(id, opts) + if err != nil { + return + } + defer rc.Close() + + // use Docker StdCopy + internal.StdCopy(pipew, pipew, rc) + + // check to see if the container is still running. If not, + // we can safely exit and assume there are no more logs left + // to stream. + v, err := e.client.InspectContainer(id) + if err != nil || !v.State.Running { + return + } + } + }() + return piper, nil +} diff --git a/engine/runner/docker/docker_test.go b/engine/runner/docker/docker_test.go new file mode 100644 index 000000000..1cdc3ff91 --- /dev/null +++ b/engine/runner/docker/docker_test.go @@ -0,0 +1 @@ +package docker diff --git a/engine/runner/docker/helper.go b/engine/runner/docker/helper.go new file mode 100644 index 000000000..25b77f955 --- /dev/null +++ b/engine/runner/docker/helper.go @@ -0,0 +1,49 @@ +package docker + +import ( + "os" + + "github.com/drone/drone/engine/runner" + "github.com/samalba/dockerclient" +) + +var ( + dockerHost = os.Getenv("DOCKER_HOST") + dockerCert = os.Getenv("DOCKER_CERT_PATH") + dockerTLS = os.Getenv("DOCKER_TLS_VERIFY") +) + +func init() { + if dockerHost == "" { + dockerHost = "unix:///var/run/docker.sock" + } +} + +// New returns a new Docker engine using the provided Docker client. +func New(client dockerclient.Client) runner.Engine { + return &dockerEngine{client} +} + +// NewEnv returns a new Docker engine from the DOCKER_HOST and DOCKER_CERT_PATH +// environment variables. +func NewEnv() (runner.Engine, error) { + config, err := dockerclient.TLSConfigFromCertPath(dockerCert) + if err == nil && dockerTLS != "1" { + config.InsecureSkipVerify = true + } + client, err := dockerclient.NewDockerClient(dockerHost, config) + if err != nil { + return nil, err + } + return New(client), nil +} + +// MustEnv returns a new Docker engine from the DOCKER_HOST and DOCKER_CERT_PATH +// environment variables. Errors creating the Docker engine will panic. +func MustEnv() runner.Engine { + engine, err := NewEnv() + if err != nil { + panic(err) + } + return engine +} diff --git a/engine/runner/docker/helper_test.go b/engine/runner/docker/helper_test.go new file mode 100644 index 000000000..1cdc3ff91 --- /dev/null +++ b/engine/runner/docker/helper_test.go @@ -0,0 +1 @@ +package docker diff --git a/engine/runner/docker/internal/README b/engine/runner/docker/internal/README new file mode 100644 index 000000000..2bd3e9830 --- /dev/null +++ b/engine/runner/docker/internal/README @@ -0,0 +1 @@ +This is an internal copy of the Docker stdcopy package that removes the logrus debug logging. The original package is found at https://github.com/docker/docker/tree/master/pkg/stdcopy \ No newline at end of file diff --git a/engine/runner/docker/internal/stdcopy.go b/engine/runner/docker/internal/stdcopy.go new file mode 100644 index 000000000..db61b0c88 --- /dev/null +++ b/engine/runner/docker/internal/stdcopy.go @@ -0,0 +1,167 @@ +package internal + +import ( + "encoding/binary" + "errors" + "fmt" + "io" +) + +// StdType is the type of standard stream +// a writer can multiplex to. +type StdType byte + +const ( + // Stdin represents standard input stream type. + Stdin StdType = iota + // Stdout represents standard output stream type. + Stdout + // Stderr represents standard error steam type. + Stderr + + stdWriterPrefixLen = 8 + stdWriterFdIndex = 0 + stdWriterSizeIndex = 4 + + startingBufLen = 32*1024 + stdWriterPrefixLen + 1 +) + +// stdWriter is wrapper of io.Writer with extra customized info. +type stdWriter struct { + io.Writer + prefix byte +} + +// Write sends the buffer to the underneath writer. +// It insert the prefix header before the buffer, +// so stdcopy.StdCopy knows where to multiplex the output. +// It makes stdWriter to implement io.Writer. +func (w *stdWriter) Write(buf []byte) (n int, err error) { + if w == nil || w.Writer == nil { + return 0, errors.New("Writer not instantiated") + } + if buf == nil { + return 0, nil + } + + header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} + binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(buf))) + + line := append(header[:], buf...) + + n, err = w.Writer.Write(line) + n -= stdWriterPrefixLen + + if n < 0 { + n = 0 + } + return +} + +// NewStdWriter instantiates a new Writer. +// Everything written to it will be encapsulated using a custom format, +// and written to the underlying `w` stream. +// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. +// `t` indicates the id of the stream to encapsulate. +// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. +func NewStdWriter(w io.Writer, t StdType) io.Writer { + return &stdWriter{ + Writer: w, + prefix: byte(t), + } +} + +// StdCopy is a modified version of io.Copy. +// +// StdCopy will demultiplex `src`, assuming that it contains two streams, +// previously multiplexed together using a StdWriter instance. +// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. +// +// StdCopy will read until it hits EOF on `src`. It will then return a nil error. +// In other words: if `err` is non nil, it indicates a real underlying error. +// +// `written` will hold the total number of bytes written to `dstout` and `dsterr`. +func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { + var ( + buf = make([]byte, startingBufLen) + bufLen = len(buf) + nr, nw int + er, ew error + out io.Writer + frameSize int + ) + + for { + // Make sure we have at least a full header + for nr < stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + // Check the first byte to know where to write + switch StdType(buf[stdWriterFdIndex]) { + case Stdin: + fallthrough + case Stdout: + // Write on stdout + out = dstout + case Stderr: + // Write on stderr + out = dsterr + default: + return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) + } + + // Retrieve the size of the frame + frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) + + // Check if the buffer is big enough to read the frame. + // Extend it if necessary. + if frameSize+stdWriterPrefixLen > bufLen { + buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) + bufLen = len(buf) + } + + // While the amount of bytes read is less than the size of the frame + header, we keep reading + for nr < frameSize+stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < frameSize+stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + // Write the retrieved frame (without header) + nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) + if ew != nil { + return 0, ew + } + // If the frame has not been fully written: error + if nw != frameSize { + return 0, io.ErrShortWrite + } + written += int64(nw) + + // Move the rest of the buffer to the beginning + copy(buf, buf[frameSize+stdWriterPrefixLen:]) + // Move the index + nr -= frameSize + stdWriterPrefixLen + } +} diff --git a/engine/runner/docker/internal/stdcopy_test.go b/engine/runner/docker/internal/stdcopy_test.go new file mode 100644 index 000000000..7a443bb8b --- /dev/null +++ b/engine/runner/docker/internal/stdcopy_test.go @@ -0,0 +1,260 @@ +package internal + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "strings" + "testing" +) + +func TestNewStdWriter(t *testing.T) { + writer := NewStdWriter(ioutil.Discard, Stdout) + if writer == nil { + t.Fatalf("NewStdWriter with an invalid StdType should not return nil.") + } +} + +func TestWriteWithUnitializedStdWriter(t *testing.T) { + writer := stdWriter{ + Writer: nil, + prefix: byte(Stdout), + } + n, err := writer.Write([]byte("Something here")) + if n != 0 || err == nil { + t.Fatalf("Should fail when given an uncomplete or uninitialized StdWriter") + } +} + +func TestWriteWithNilBytes(t *testing.T) { + writer := NewStdWriter(ioutil.Discard, Stdout) + n, err := writer.Write(nil) + if err != nil { + t.Fatalf("Shouldn't have fail when given no data") + } + if n > 0 { + t.Fatalf("Write should have written 0 byte, but has written %d", n) + } +} + +func TestWrite(t *testing.T) { + writer := NewStdWriter(ioutil.Discard, Stdout) + data := []byte("Test StdWrite.Write") + n, err := writer.Write(data) + if err != nil { + t.Fatalf("Error while writing with StdWrite") + } + if n != len(data) { + t.Fatalf("Write should have written %d byte but wrote %d.", len(data), n) + } +} + +type errWriter struct { + n int + err error +} + +func (f *errWriter) Write(buf []byte) (int, error) { + return f.n, f.err +} + +func TestWriteWithWriterError(t *testing.T) { + expectedError := errors.New("expected") + expectedReturnedBytes := 10 + writer := NewStdWriter(&errWriter{ + n: stdWriterPrefixLen + expectedReturnedBytes, + err: expectedError}, Stdout) + data := []byte("This won't get written, sigh") + n, err := writer.Write(data) + if err != expectedError { + t.Fatalf("Didn't get expected error.") + } + if n != expectedReturnedBytes { + t.Fatalf("Didn't get expected written bytes %d, got %d.", + expectedReturnedBytes, n) + } +} + +func TestWriteDoesNotReturnNegativeWrittenBytes(t *testing.T) { + writer := NewStdWriter(&errWriter{n: -1}, Stdout) + data := []byte("This won't get written, sigh") + actual, _ := writer.Write(data) + if actual != 0 { + t.Fatalf("Expected returned written bytes equal to 0, got %d", actual) + } +} + +func getSrcBuffer(stdOutBytes, stdErrBytes []byte) (buffer *bytes.Buffer, err error) { + buffer = new(bytes.Buffer) + dstOut := NewStdWriter(buffer, Stdout) + _, err = dstOut.Write(stdOutBytes) + if err != nil { + return + } + dstErr := NewStdWriter(buffer, Stderr) + _, err = dstErr.Write(stdErrBytes) + return +} + +func TestStdCopyWriteAndRead(t *testing.T) { + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + written, err := StdCopy(ioutil.Discard, ioutil.Discard, buffer) + if err != nil { + t.Fatal(err) + } + expectedTotalWritten := len(stdOutBytes) + len(stdErrBytes) + if written != int64(expectedTotalWritten) { + t.Fatalf("Expected to have total of %d bytes written, got %d", expectedTotalWritten, written) + } +} + +type customReader struct { + n int + err error + totalCalls int + correctCalls int + src *bytes.Buffer +} + +func (f *customReader) Read(buf []byte) (int, error) { + f.totalCalls++ + if f.totalCalls <= f.correctCalls { + return f.src.Read(buf) + } + return f.n, f.err +} + +func TestStdCopyReturnsErrorReadingHeader(t *testing.T) { + expectedError := errors.New("error") + reader := &customReader{ + err: expectedError} + written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) + if written != 0 { + t.Fatalf("Expected 0 bytes read, got %d", written) + } + if err != expectedError { + t.Fatalf("Didn't get expected error") + } +} + +func TestStdCopyReturnsErrorReadingFrame(t *testing.T) { + expectedError := errors.New("error") + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + reader := &customReader{ + correctCalls: 1, + n: stdWriterPrefixLen + 1, + err: expectedError, + src: buffer} + written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) + if written != 0 { + t.Fatalf("Expected 0 bytes read, got %d", written) + } + if err != expectedError { + t.Fatalf("Didn't get expected error") + } +} + +func TestStdCopyDetectsCorruptedFrame(t *testing.T) { + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + reader := &customReader{ + correctCalls: 1, + n: stdWriterPrefixLen + 1, + err: io.EOF, + src: buffer} + written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader) + if written != startingBufLen { + t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written) + } + if err != nil { + t.Fatal("Didn't get nil error") + } +} + +func TestStdCopyWithInvalidInputHeader(t *testing.T) { + dstOut := NewStdWriter(ioutil.Discard, Stdout) + dstErr := NewStdWriter(ioutil.Discard, Stderr) + src := strings.NewReader("Invalid input") + _, err := StdCopy(dstOut, dstErr, src) + if err == nil { + t.Fatal("StdCopy with invalid input header should fail.") + } +} + +func TestStdCopyWithCorruptedPrefix(t *testing.T) { + data := []byte{0x01, 0x02, 0x03} + src := bytes.NewReader(data) + written, err := StdCopy(nil, nil, src) + if err != nil { + t.Fatalf("StdCopy should not return an error with corrupted prefix.") + } + if written != 0 { + t.Fatalf("StdCopy should have written 0, but has written %d", written) + } +} + +func TestStdCopyReturnsWriteErrors(t *testing.T) { + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + expectedError := errors.New("expected") + + dstOut := &errWriter{err: expectedError} + + written, err := StdCopy(dstOut, ioutil.Discard, buffer) + if written != 0 { + t.Fatalf("StdCopy should have written 0, but has written %d", written) + } + if err != expectedError { + t.Fatalf("Didn't get expected error, got %v", err) + } +} + +func TestStdCopyDetectsNotFullyWrittenFrames(t *testing.T) { + stdOutBytes := []byte(strings.Repeat("o", startingBufLen)) + stdErrBytes := []byte(strings.Repeat("e", startingBufLen)) + buffer, err := getSrcBuffer(stdOutBytes, stdErrBytes) + if err != nil { + t.Fatal(err) + } + dstOut := &errWriter{n: startingBufLen - 10} + + written, err := StdCopy(dstOut, ioutil.Discard, buffer) + if written != 0 { + t.Fatalf("StdCopy should have return 0 written bytes, but returned %d", written) + } + if err != io.ErrShortWrite { + t.Fatalf("Didn't get expected io.ErrShortWrite error") + } +} + +func BenchmarkWrite(b *testing.B) { + w := NewStdWriter(ioutil.Discard, Stdout) + data := []byte("Test line for testing stdwriter performance\n") + data = bytes.Repeat(data, 100) + b.SetBytes(int64(len(data))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := w.Write(data); err != nil { + b.Fatal(err) + } + } +} diff --git a/engine/runner/docker/util.go b/engine/runner/docker/util.go new file mode 100644 index 000000000..2d35fee7c --- /dev/null +++ b/engine/runner/docker/util.go @@ -0,0 +1,102 @@ +package docker + +import ( + "fmt" + "strings" + + "github.com/drone/drone/engine/runner" + "github.com/samalba/dockerclient" +) + +// helper function that converts the Continer data structure to the exepcted +// dockerclient.ContainerConfig. +func toContainerConfig(c *runner.Container) *dockerclient.ContainerConfig { + config := &dockerclient.ContainerConfig{ + Image: c.Image, + Env: toEnvironmentSlice(c.Environment), + Cmd: c.Command, + Entrypoint: c.Entrypoint, + WorkingDir: c.WorkingDir, + HostConfig: dockerclient.HostConfig{ + Privileged: c.Privileged, + NetworkMode: c.Network, + Memory: c.MemLimit, + CpuShares: c.CPUShares, + CpuQuota: c.CPUQuota, + CpusetCpus: c.CPUSet, + MemorySwappiness: -1, + OomKillDisable: c.OomKillDisable, + }, + } + + if len(config.Entrypoint) == 0 { + config.Entrypoint = nil + } + if len(config.Cmd) == 0 { + config.Cmd = nil + } + if len(c.ExtraHosts) > 0 { + config.HostConfig.ExtraHosts = c.ExtraHosts + } + if len(c.DNS) != 0 { + config.HostConfig.Dns = c.DNS + } + if len(c.DNSSearch) != 0 { + config.HostConfig.DnsSearch = c.DNSSearch + } + if len(c.VolumesFrom) != 0 { + config.HostConfig.VolumesFrom = c.VolumesFrom + } + + config.Volumes = map[string]struct{}{} + for _, path := range c.Volumes { + if strings.Index(path, ":") == -1 { + config.Volumes[path] = struct{}{} + continue + } + parts := strings.Split(path, ":") + config.Volumes[parts[1]] = struct{}{} + config.HostConfig.Binds = append(config.HostConfig.Binds, path) + } + + for _, path := range c.Devices { + if strings.Index(path, ":") == -1 { + continue + } + parts := strings.Split(path, ":") + device := dockerclient.DeviceMapping{ + PathOnHost: parts[0], + PathInContainer: parts[1], + CgroupPermissions: "rwm", + } + config.HostConfig.Devices = append(config.HostConfig.Devices, device) + } + + return config +} + +// helper function that converts the AuthConfig data structure to the exepcted +// dockerclient.AuthConfig. +func toAuthConfig(container *runner.Container) *dockerclient.AuthConfig { + if container.AuthConfig.Username == "" && + container.AuthConfig.Password == "" && + container.AuthConfig.Token == "" { + return nil + } + return &dockerclient.AuthConfig{ + Email: container.AuthConfig.Email, + Username: container.AuthConfig.Username, + Password: container.AuthConfig.Password, + RegistryToken: container.AuthConfig.Token, + } +} + +// helper function that converts a key value map of environment variables to a +// string slice in key=value format. +func toEnvironmentSlice(env map[string]string) []string { + var envs []string + for k, v := range env { + envs = append(envs, fmt.Sprintf("%s=%s", k, v)) + } + return envs +} diff --git a/engine/runner/docker/util_test.go b/engine/runner/docker/util_test.go new file mode 100644 index 000000000..1a4a8ce3c --- /dev/null +++ b/engine/runner/docker/util_test.go @@ -0,0 +1,24 @@ +package docker + +import ( + "testing" +) + +func Test_toContainerConfig(t *testing.T) { + t.Skip() +} + +func Test_toAuthConfig(t *testing.T) { + t.Skip() +} + +func Test_toEnvironmentSlice(t *testing.T) { + env := map[string]string{ + "HOME": "/root", + } + envs := toEnvironmentSlice(env) + want, got := "HOME=/root", envs[0] + if want != got { + t.Errorf("Wanted envar %s got %s", want, got) + } +} diff --git a/engine/runner/runner.go b/engine/runner/runner.go index 8085c214b..7bad01840 100644 --- a/engine/runner/runner.go +++ b/engine/runner/runner.go @@ -2,8 +2,8 @@ package runner import ( "bufio" - "time" "fmt" + "time" "github.com/drone/drone/engine/runner/parse" @@ -59,7 +59,7 @@ type Runner struct { // Run starts the build runner but does not wait for it to complete. The Wait // method will return the exit code and release associated resources once the // running containers exit. -func (r *Runner) Run() error { +func (r *Runner) Run() { go func() { r.setup() @@ -74,8 +74,6 @@ func (r *Runner) Run() error { <-r.ctx.Done() r.cancel() }() - - return nil } // Wait waits for the runner to exit. diff --git a/web/hook.go b/web/hook.go index a476e5f13..cfde989a9 100644 --- a/web/hook.go +++ b/web/hook.go @@ -17,6 +17,7 @@ import ( "github.com/drone/drone/shared/httputil" "github.com/drone/drone/shared/token" "github.com/drone/drone/store" + "github.com/drone/drone/queue" ) var ( @@ -204,6 +205,35 @@ func PostHook(c *gin.Context) { // on status change notifications last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) + // IMPORTANT. PLEASE READ + // + // The below code uses a feature flag to switch between the current + // build engine and the exerimental 0.5 build engine. This can be + // enabled using with the environment variable CANARY=true + + if os.Getenv("CANARY") == "true" { + for _, job := range jobs { + queue.Publish(c, &queue.Work{ + User: user, + Repo: repo, + Build: build, + BuildLast: last, + Job: job, + Keys: key, + Netrc: netrc, + Yaml: string(raw), + YamlEnc: string(sec), + System: &model.System{ + Link: httputil.GetURL(c.Request), + Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "), + Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "), + Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), + }, + }) + } + return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW + } + engine_ := engine.FromContext(c) go engine_.Schedule(c.Copy(), &engine.Task{ User: user, @@ -222,5 +252,4 @@ func PostHook(c *gin.Context) { Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "), }, }) - } diff --git a/web/stream2.go b/web/stream2.go new file mode 100644 index 000000000..ec245f598 --- /dev/null +++ b/web/stream2.go @@ -0,0 +1,126 @@ +package web + +import ( + "bufio" + "encoding/json" + "io" + "strconv" + + "github.com/gin-gonic/gin" + + "github.com/drone/drone/bus" + "github.com/drone/drone/model" + "github.com/drone/drone/router/middleware/session" + "github.com/drone/drone/store" + "github.com/drone/drone/stream" + + log "github.com/Sirupsen/logrus" + + "github.com/manucorporat/sse" +) + +// IMPORTANT. PLEASE READ +// +// This file containers experimental streaming features for the 0.5 +// release. These can be enabled with the feature flag CANARY=true + +// GetRepoEvents will upgrade the connection to a Websocket and will stream +// event updates to the browser. +func GetRepoEvents2(c *gin.Context) { + repo := session.Repo(c) + c.Writer.Header().Set("Content-Type", "text/event-stream") + + eventc := make(chan *bus.Event, 1) + bus.Subscribe(c, eventc) + defer func() { + bus.Unsubscribe(c, eventc) + close(eventc) + log.Infof("closed event stream") + }() + + c.Stream(func(w io.Writer) bool { + select { + case event := <-eventc: + if event == nil { + log.Infof("nil event received") + return false + } + + if event.Repo.FullName == repo.FullName { + + var payload = struct { + model.Build + Jobs []*model.Job `json:"jobs"` + }{} + payload.Build = event.Build + payload.Jobs, _ = store.GetJobList(c, &event.Build) + data, _ := json.Marshal(&payload) + + sse.Encode(w, sse.Event{ + Event: "message", + Data: string(data), + }) + } + case <-c.Writer.CloseNotify(): + return false + } + return true + }) +} + +func GetStream2(c *gin.Context) { + + repo := session.Repo(c) + buildn, _ := strconv.Atoi(c.Param("build")) + jobn, _ := strconv.Atoi(c.Param("number")) + + c.Writer.Header().Set("Content-Type", "text/event-stream") + + build, err := store.GetBuildNumber(c, repo, buildn) + if err != nil { + log.Debugln("stream cannot get build number.", err) + c.AbortWithError(404, err) + return + } + job, err := store.GetJobNumber(c, build, jobn) + if err != nil { + log.Debugln("stream cannot get job number.", err) + c.AbortWithError(404, err) + return + } + + rc, wc, err := stream.Open(c, stream.ToKey(job.ID)) + if err != nil { + c.AbortWithError(404, err) + return + } + + defer func() { + if wc != nil { + wc.Close() + } + if rc != nil { + rc.Close() + } + }() + + go func() { + <-c.Writer.CloseNotify() + rc.Close() + }() + + var line int + var scanner = bufio.NewScanner(rc) + for scanner.Scan() { + line++ + var err = sse.Encode(c.Writer, sse.Event{ + Id: strconv.Itoa(line), + Event: "message", + Data: scanner.Text(), + }) + if err != nil { + break + } + c.Writer.Flush() + } +}