diff --git a/Dockerfile b/Dockerfile index de1310eca..9a634564d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,6 @@ # Build the drone executable on a x64 Linux host: # -# go build --ldflags '-extldflags "-static"' -o drone_static -# -# -# Alternate command for Go 1.4 and older: -# -# go build -a -tags netgo --ldflags '-extldflags "-static"' -o drone_static -# +# go build --ldflags '-extldflags "-static"' -o drone # # Build the docker image: # diff --git a/api/queue.go b/api/queue.go index 4147b58b3..269f81e82 100644 --- a/api/queue.go +++ b/api/queue.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strconv" + "sync" "github.com/Sirupsen/logrus" "github.com/drone/drone/bus" @@ -96,6 +97,13 @@ func Update(c *gin.Context) { store.UpdateBuild(c, build) } + if job.Status == model.StatusRunning { + err := stream.Create(c, stream.ToKey(job.ID)) + if err != nil { + logrus.Errorf("Unable to create stream. %s", err) + } + } + ok, err := store.UpdateBuildJob(c, build, job) if err != nil { c.String(500, "Unable to update job. %s", err) @@ -132,32 +140,38 @@ func Stream(c *gin.Context) { return } - key := stream.ToKey(id) - rc, wc, err := stream.Create(c, key) + key := c.Param("id") + logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) + + wc, err := stream.Writer(c, key) if err != nil { - logrus.Errorf("Agent %s failed to create stream. %s.", c.ClientIP(), err) + c.String(500, "Failed to create stream writer. %s", err) return } defer func() { wc.Close() - rc.Close() - stream.Remove(c, key) + stream.Delete(c, key) }() io.Copy(wc, c.Request.Body) - wc.Close() - rcc, _, err := stream.Open(c, key) + rc, err := stream.Reader(c, key) if err != nil { - logrus.Errorf("Agent %s failed to read cache. %s.", c.ClientIP(), err) + c.String(500, "Failed to create stream reader. %s", err) return } - defer func() { - rcc.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer recover() + store.WriteLog(c, &model.Job{ID: id}, rc) + wg.Done() }() - store.WriteLog(c, &model.Job{ID: id}, rcc) + wg.Wait() c.String(200, "") logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) diff --git a/drone/agent/exec.go b/drone/agent/exec.go index 2a3eafba4..0bab31c28 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -195,10 +195,11 @@ func (r *pipeline) run() error { w.Job.Status = model.StatusFailure } + pushRetry(r.drone, w) + logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - pushRetry(r.drone, w) return nil } diff --git a/router/router.go b/router/router.go index efa06160e..157e6f18e 100644 --- a/router/router.go +++ b/router/router.go @@ -164,12 +164,14 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler { queue := e.Group("/api/queue") { - queue.Use(middleware.AgentMust()) - queue.POST("/pull", api.Pull) - queue.POST("/pull/:os/:arch", api.Pull) - queue.POST("/wait/:id", api.Wait) - queue.POST("/stream/:id", api.Stream) - queue.POST("/status/:id", api.Update) + if os.Getenv("CANARY") == "true" { + queue.Use(middleware.AgentMust()) + queue.POST("/pull", api.Pull) + queue.POST("/pull/:os/:arch", api.Pull) + queue.POST("/wait/:id", api.Wait) + queue.POST("/stream/:id", api.Stream) + queue.POST("/status/:id", api.Update) + } } gitlab := e.Group("/gitlab/:owner/:name") diff --git a/stream/context.go b/stream/context.go index 9b89accf1..e1202cd1b 100644 --- a/stream/context.go +++ b/stream/context.go @@ -9,13 +9,13 @@ type Setter interface { Set(string, interface{}) } -// FromContext returns the Mux associated with this context. -func FromContext(c context.Context) Mux { - return c.Value(key).(Mux) +// FromContext returns the Stream associated with this context. +func FromContext(c context.Context) Stream { + return c.Value(key).(Stream) } -// ToContext adds the Mux to this context if it supports -// the Setter interface. -func ToContext(c Setter, m Mux) { - c.Set(key, m) +// ToContext adds the Stream to this context if it supports the +// Setter interface. +func ToContext(c Setter, s Stream) { + c.Set(key, s) } diff --git a/stream/reader.go b/stream/reader.go new file mode 100644 index 000000000..dade6d7e9 --- /dev/null +++ b/stream/reader.go @@ -0,0 +1,44 @@ +package stream + +import ( + "bytes" + "io" +) + +type reader struct { + w *writer + off int +} + +// Read reads from the Buffer +func (r *reader) Read(p []byte) (n int, err error) { + r.w.RLock() + defer r.w.RUnlock() + + var m int + + for len(p) > 0 { + + m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p) + n += m + r.off += n + + if n > 0 { + break + } + + if r.w.Closed() { + err = io.EOF + break + } + + r.w.Wait() + } + + return +} + +func (r *reader) Close() error { + // TODO close should remove reader from the parent! + return nil +} diff --git a/stream/stream.go b/stream/stream.go index 787ea48e2..83cd3bc4d 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -1,7 +1,5 @@ package stream -//go:generate mockery -name Mux -output mock -case=underscore - import ( "bufio" "io" @@ -10,43 +8,32 @@ import ( "golang.org/x/net/context" ) -// Mux defines a stream multiplexer -type Mux interface { - // Create creates and returns a new stream identified by - // the specified key. - Create(key string) (io.ReadCloser, io.WriteCloser, error) - - // Open returns the existing stream by key. If the stream - // does not exist an error is returned. - Open(key string) (io.ReadCloser, io.WriteCloser, error) - - // Remove deletes the stream by key. - Remove(key string) error - - // Exists return true if the stream exists. - Exists(key string) bool +// Stream manages the stream of build logs. +type Stream interface { + Create(string) error + Delete(string) error + Reader(string) (io.Reader, error) + Writer(string) (io.WriteCloser, error) } -// Create creates and returns a new stream identified -// by the specified key. -func Create(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) { +// Create creates a new stream. +func Create(c context.Context, key string) error { return FromContext(c).Create(key) } -// Open returns the existing stream by key. If the stream does -// not exist an error is returned. -func Open(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) { - return FromContext(c).Open(key) +// Reader opens the stream for reading. +func Reader(c context.Context, key string) (io.Reader, error) { + return FromContext(c).Reader(key) } -// Exists return true if the stream exists. -func Exists(c context.Context, key string) bool { - return FromContext(c).Exists(key) +// Writer opens the stream for writing. +func Writer(c context.Context, key string) (io.WriteCloser, error) { + return FromContext(c).Writer(key) } -// Remove deletes the stream by key. -func Remove(c context.Context, key string) error { - return FromContext(c).Remove(key) +// Delete deletes the stream by key. +func Delete(c context.Context, key string) error { + return FromContext(c).Delete(key) } // ToKey is a helper function that converts a unique identifier @@ -55,9 +42,9 @@ func ToKey(i int64) string { return strconv.FormatInt(i, 10) } -// Copy copies the stream from the source to the destination in -// valid JSON format. This converts the logs, which are per-line -// JSON objects, to a JSON array. +// Copy copies the stream from the source to the destination in valid JSON +// format. This converts the logs, which are per-line JSON objects, to a +// proper JSON array. func Copy(dest io.Writer, src io.Reader) error { io.WriteString(dest, "[") diff --git a/stream/stream_impl.go b/stream/stream_impl.go index a25a3b155..5411e26bb 100644 --- a/stream/stream_impl.go +++ b/stream/stream_impl.go @@ -1,96 +1,71 @@ package stream import ( + "fmt" "io" "sync" - - "github.com/djherbis/fscache" ) -var noexp fscache.Reaper +type stream struct { + sync.Mutex + writers map[string]*writer +} -// New creates a new Mux using an in-memory filesystem. -func New() Mux { - fs := fscache.NewMemFs() - c, err := fscache.NewCache(fs, noexp) - if err != nil { - panic(err) +// New returns a new in-memory implementation of Stream. +func New() Stream { + return &stream{ + writers: map[string]*writer{}, } - return &mux{c} } -// New creates a new Mux using a persistent filesystem. -func NewFileSystem(path string) Mux { - fs, err := fscache.NewFs(path, 0777) - if err != nil { - panic(err) +// Reader returns an io.Reader for reading from to the stream. +func (s *stream) Reader(name string) (io.Reader, error) { + s.Lock() + defer s.Unlock() + + if !s.exists(name) { + return nil, fmt.Errorf("stream: cannot read stream %s, not found", name) } - c, err := fscache.NewCache(fs, noexp) - if err != nil { - panic(err) + return s.writers[name].Reader() +} + +// Writer returns an io.WriteCloser for writing to the stream. +func (s *stream) Writer(name string) (io.WriteCloser, error) { + s.Lock() + defer s.Unlock() + + if !s.exists(name) { + return nil, fmt.Errorf("stream: cannot write stream %s, not found", name) } - return &mux{c} + return s.writers[name], nil } -// mux wraps the default fscache.Cache to match the -// defined interface and to wrap the ReadCloser and -// WriteCloser to avoid panics when we over-aggressively -// close streams. -type mux struct { - cache fscache.Cache -} +// Create creates a new stream. +func (s *stream) Create(name string) error { + s.Lock() + defer s.Unlock() -func (m *mux) Create(key string) (io.ReadCloser, io.WriteCloser, error) { - rc, wc, err := m.cache.Get(key) - if rc != nil { - rc = &closeOnceReader{ReaderAt: rc, ReadCloser: rc} + if s.exists(name) { + return fmt.Errorf("stream: cannot create stream %s, already exists", name) } - if wc != nil { - wc = &closeOnceWriter{WriteCloser: wc} - } - return rc, wc, err -} -func (m *mux) Open(key string) (io.ReadCloser, io.WriteCloser, error) { - return m.Create(key) -} - -func (m *mux) Exists(key string) bool { - return m.cache.Exists(key) -} -func (m *mux) Remove(key string) error { - return m.cache.Remove(key) -} - -// closeOnceReader is a helper function that ensures -// the reader is only closed once. This is because -// attempting to close the fscache reader more than -// once results in a panic. -type closeOnceReader struct { - io.ReaderAt - io.ReadCloser - once sync.Once -} - -func (c *closeOnceReader) Close() error { - c.once.Do(func() { - c.ReadCloser.Close() - }) + s.writers[name] = newWriter() return nil } -// closeOnceWriter is a helper function that ensures -// the writer is only closed once. This is because -// attempting to close the fscache writer more than -// once results in a panic. -type closeOnceWriter struct { - io.WriteCloser - once sync.Once +// Delete deletes the stream by key. +func (s *stream) Delete(name string) error { + s.Lock() + defer s.Unlock() + + if !s.exists(name) { + return fmt.Errorf("stream: cannot delete stream %s, not found", name) + } + delete(s.writers, name) + return s.writers[name].Close() } -func (c *closeOnceWriter) Close() error { - c.once.Do(func() { - c.WriteCloser.Close() - }) - return nil +func (s *stream) exists(name string) bool { + _, exists := s.writers[name] + return exists } diff --git a/stream/stream_impl_test.go b/stream/stream_impl_test.go deleted file mode 100644 index 11541cc2b..000000000 --- a/stream/stream_impl_test.go +++ /dev/null @@ -1 +0,0 @@ -package stream diff --git a/stream/writer.go b/stream/writer.go new file mode 100644 index 000000000..0cb827a12 --- /dev/null +++ b/stream/writer.go @@ -0,0 +1,52 @@ +package stream + +import ( + "bytes" + "io" + "sync" + "sync/atomic" +) + +type writer struct { + sync.RWMutex + *sync.Cond + + buffer bytes.Buffer + closed uint32 +} + +func newWriter() *writer { + var w writer + w.Cond = sync.NewCond(w.RWMutex.RLocker()) + return &w +} + +func (w *writer) Write(p []byte) (n int, err error) { + defer w.Broadcast() + w.Lock() + defer w.Unlock() + if w.Closed() { + return 0, io.EOF + } + return w.buffer.Write(p) +} + +func (w *writer) Reader() (io.Reader, error) { + return &reader{w: w}, nil +} + +func (w *writer) Wait() { + if !w.Closed() { + w.Cond.Wait() + } +} + +func (w *writer) Close() error { + atomic.StoreUint32(&w.closed, 1) + w.Cond.Broadcast() + return nil +} + +func (w *writer) Closed() bool { + return atomic.LoadUint32(&w.closed) == 1 +} diff --git a/web/stream2.go b/web/stream2.go index 039b5dfff..c5fff2ad1 100644 --- a/web/stream2.go +++ b/web/stream2.go @@ -91,24 +91,15 @@ func GetStream2(c *gin.Context) { return } - rc, wc, err := stream.Open(c, stream.ToKey(job.ID)) + rc, err := stream.Reader(c, stream.ToKey(job.ID)) if err != nil { c.AbortWithError(404, err) return } - defer func() { - if wc != nil { - wc.Close() - } - if rc != nil { - rc.Close() - } - }() - go func() { <-c.Writer.CloseNotify() - rc.Close() + // rc.Close() }() var line int @@ -125,4 +116,6 @@ func GetStream2(c *gin.Context) { } c.Writer.Flush() } + + log.Debugf("Closed stream %s#%d", repo.FullName, build.Number) }