created custom stream implementation

This commit is contained in:
Brad Rydzewski 2016-04-22 17:35:32 -07:00
parent 5ca35c4c63
commit 34c3c92ec3
11 changed files with 210 additions and 149 deletions

View file

@ -1,12 +1,6 @@
# Build the drone executable on a x64 Linux host: # Build the drone executable on a x64 Linux host:
# #
# go build --ldflags '-extldflags "-static"' -o drone_static # go build --ldflags '-extldflags "-static"' -o drone
#
#
# Alternate command for Go 1.4 and older:
#
# go build -a -tags netgo --ldflags '-extldflags "-static"' -o drone_static
#
# #
# Build the docker image: # Build the docker image:
# #

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"strconv" "strconv"
"sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/drone/drone/bus" "github.com/drone/drone/bus"
@ -96,6 +97,13 @@ func Update(c *gin.Context) {
store.UpdateBuild(c, build) store.UpdateBuild(c, build)
} }
if job.Status == model.StatusRunning {
err := stream.Create(c, stream.ToKey(job.ID))
if err != nil {
logrus.Errorf("Unable to create stream. %s", err)
}
}
ok, err := store.UpdateBuildJob(c, build, job) ok, err := store.UpdateBuildJob(c, build, job)
if err != nil { if err != nil {
c.String(500, "Unable to update job. %s", err) c.String(500, "Unable to update job. %s", err)
@ -132,32 +140,38 @@ func Stream(c *gin.Context) {
return return
} }
key := stream.ToKey(id) key := c.Param("id")
rc, wc, err := stream.Create(c, key) logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key)
wc, err := stream.Writer(c, key)
if err != nil { if err != nil {
logrus.Errorf("Agent %s failed to create stream. %s.", c.ClientIP(), err) c.String(500, "Failed to create stream writer. %s", err)
return return
} }
defer func() { defer func() {
wc.Close() wc.Close()
rc.Close() stream.Delete(c, key)
stream.Remove(c, key)
}() }()
io.Copy(wc, c.Request.Body) io.Copy(wc, c.Request.Body)
wc.Close()
rcc, _, err := stream.Open(c, key) rc, err := stream.Reader(c, key)
if err != nil { if err != nil {
logrus.Errorf("Agent %s failed to read cache. %s.", c.ClientIP(), err) c.String(500, "Failed to create stream reader. %s", err)
return return
} }
defer func() {
rcc.Close() wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer recover()
store.WriteLog(c, &model.Job{ID: id}, rc)
wg.Done()
}() }()
store.WriteLog(c, &model.Job{ID: id}, rcc) wg.Wait()
c.String(200, "") c.String(200, "")
logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())

View file

@ -195,10 +195,11 @@ func (r *pipeline) run() error {
w.Job.Status = model.StatusFailure w.Job.Status = model.StatusFailure
} }
pushRetry(r.drone, w)
logrus.Infof("Finished build %s/%s#%d.%d", logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
pushRetry(r.drone, w)
return nil return nil
} }

View file

@ -164,6 +164,7 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
queue := e.Group("/api/queue") queue := e.Group("/api/queue")
{ {
if os.Getenv("CANARY") == "true" {
queue.Use(middleware.AgentMust()) queue.Use(middleware.AgentMust())
queue.POST("/pull", api.Pull) queue.POST("/pull", api.Pull)
queue.POST("/pull/:os/:arch", api.Pull) queue.POST("/pull/:os/:arch", api.Pull)
@ -171,6 +172,7 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
queue.POST("/stream/:id", api.Stream) queue.POST("/stream/:id", api.Stream)
queue.POST("/status/:id", api.Update) queue.POST("/status/:id", api.Update)
} }
}
gitlab := e.Group("/gitlab/:owner/:name") gitlab := e.Group("/gitlab/:owner/:name")
{ {

View file

@ -9,13 +9,13 @@ type Setter interface {
Set(string, interface{}) Set(string, interface{})
} }
// FromContext returns the Mux associated with this context. // FromContext returns the Stream associated with this context.
func FromContext(c context.Context) Mux { func FromContext(c context.Context) Stream {
return c.Value(key).(Mux) return c.Value(key).(Stream)
} }
// ToContext adds the Mux to this context if it supports // ToContext adds the Stream to this context if it supports the
// the Setter interface. // Setter interface.
func ToContext(c Setter, m Mux) { func ToContext(c Setter, s Stream) {
c.Set(key, m) c.Set(key, s)
} }

44
stream/reader.go Normal file
View file

@ -0,0 +1,44 @@
package stream
import (
"bytes"
"io"
)
type reader struct {
w *writer
off int
}
// Read reads from the Buffer
func (r *reader) Read(p []byte) (n int, err error) {
r.w.RLock()
defer r.w.RUnlock()
var m int
for len(p) > 0 {
m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p)
n += m
r.off += n
if n > 0 {
break
}
if r.w.Closed() {
err = io.EOF
break
}
r.w.Wait()
}
return
}
func (r *reader) Close() error {
// TODO close should remove reader from the parent!
return nil
}

View file

@ -1,7 +1,5 @@
package stream package stream
//go:generate mockery -name Mux -output mock -case=underscore
import ( import (
"bufio" "bufio"
"io" "io"
@ -10,43 +8,32 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
// Mux defines a stream multiplexer // Stream manages the stream of build logs.
type Mux interface { type Stream interface {
// Create creates and returns a new stream identified by Create(string) error
// the specified key. Delete(string) error
Create(key string) (io.ReadCloser, io.WriteCloser, error) Reader(string) (io.Reader, error)
Writer(string) (io.WriteCloser, error)
// Open returns the existing stream by key. If the stream
// does not exist an error is returned.
Open(key string) (io.ReadCloser, io.WriteCloser, error)
// Remove deletes the stream by key.
Remove(key string) error
// Exists return true if the stream exists.
Exists(key string) bool
} }
// Create creates and returns a new stream identified // Create creates a new stream.
// by the specified key. func Create(c context.Context, key string) error {
func Create(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) {
return FromContext(c).Create(key) return FromContext(c).Create(key)
} }
// Open returns the existing stream by key. If the stream does // Reader opens the stream for reading.
// not exist an error is returned. func Reader(c context.Context, key string) (io.Reader, error) {
func Open(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) { return FromContext(c).Reader(key)
return FromContext(c).Open(key)
} }
// Exists return true if the stream exists. // Writer opens the stream for writing.
func Exists(c context.Context, key string) bool { func Writer(c context.Context, key string) (io.WriteCloser, error) {
return FromContext(c).Exists(key) return FromContext(c).Writer(key)
} }
// Remove deletes the stream by key. // Delete deletes the stream by key.
func Remove(c context.Context, key string) error { func Delete(c context.Context, key string) error {
return FromContext(c).Remove(key) return FromContext(c).Delete(key)
} }
// ToKey is a helper function that converts a unique identifier // ToKey is a helper function that converts a unique identifier
@ -55,9 +42,9 @@ func ToKey(i int64) string {
return strconv.FormatInt(i, 10) return strconv.FormatInt(i, 10)
} }
// Copy copies the stream from the source to the destination in // Copy copies the stream from the source to the destination in valid JSON
// valid JSON format. This converts the logs, which are per-line // format. This converts the logs, which are per-line JSON objects, to a
// JSON objects, to a JSON array. // proper JSON array.
func Copy(dest io.Writer, src io.Reader) error { func Copy(dest io.Writer, src io.Reader) error {
io.WriteString(dest, "[") io.WriteString(dest, "[")

View file

@ -1,96 +1,71 @@
package stream package stream
import ( import (
"fmt"
"io" "io"
"sync" "sync"
"github.com/djherbis/fscache"
) )
var noexp fscache.Reaper type stream struct {
sync.Mutex
writers map[string]*writer
}
// New creates a new Mux using an in-memory filesystem. // New returns a new in-memory implementation of Stream.
func New() Mux { func New() Stream {
fs := fscache.NewMemFs() return &stream{
c, err := fscache.NewCache(fs, noexp) writers: map[string]*writer{},
if err != nil {
panic(err)
} }
return &mux{c}
} }
// New creates a new Mux using a persistent filesystem. // Reader returns an io.Reader for reading from to the stream.
func NewFileSystem(path string) Mux { func (s *stream) Reader(name string) (io.Reader, error) {
fs, err := fscache.NewFs(path, 0777) s.Lock()
if err != nil { defer s.Unlock()
panic(err)
if !s.exists(name) {
return nil, fmt.Errorf("stream: cannot read stream %s, not found", name)
} }
c, err := fscache.NewCache(fs, noexp) return s.writers[name].Reader()
if err != nil { }
panic(err)
// Writer returns an io.WriteCloser for writing to the stream.
func (s *stream) Writer(name string) (io.WriteCloser, error) {
s.Lock()
defer s.Unlock()
if !s.exists(name) {
return nil, fmt.Errorf("stream: cannot write stream %s, not found", name)
} }
return &mux{c} return s.writers[name], nil
} }
// mux wraps the default fscache.Cache to match the // Create creates a new stream.
// defined interface and to wrap the ReadCloser and func (s *stream) Create(name string) error {
// WriteCloser to avoid panics when we over-aggressively s.Lock()
// close streams. defer s.Unlock()
type mux struct {
cache fscache.Cache
}
func (m *mux) Create(key string) (io.ReadCloser, io.WriteCloser, error) { if s.exists(name) {
rc, wc, err := m.cache.Get(key) return fmt.Errorf("stream: cannot create stream %s, already exists", name)
if rc != nil {
rc = &closeOnceReader{ReaderAt: rc, ReadCloser: rc}
} }
if wc != nil {
wc = &closeOnceWriter{WriteCloser: wc}
}
return rc, wc, err
}
func (m *mux) Open(key string) (io.ReadCloser, io.WriteCloser, error) { s.writers[name] = newWriter()
return m.Create(key)
}
func (m *mux) Exists(key string) bool {
return m.cache.Exists(key)
}
func (m *mux) Remove(key string) error {
return m.cache.Remove(key)
}
// closeOnceReader is a helper function that ensures
// the reader is only closed once. This is because
// attempting to close the fscache reader more than
// once results in a panic.
type closeOnceReader struct {
io.ReaderAt
io.ReadCloser
once sync.Once
}
func (c *closeOnceReader) Close() error {
c.once.Do(func() {
c.ReadCloser.Close()
})
return nil return nil
} }
// closeOnceWriter is a helper function that ensures // Delete deletes the stream by key.
// the writer is only closed once. This is because func (s *stream) Delete(name string) error {
// attempting to close the fscache writer more than s.Lock()
// once results in a panic. defer s.Unlock()
type closeOnceWriter struct {
io.WriteCloser if !s.exists(name) {
once sync.Once return fmt.Errorf("stream: cannot delete stream %s, not found", name)
}
delete(s.writers, name)
return s.writers[name].Close()
} }
func (c *closeOnceWriter) Close() error { func (s *stream) exists(name string) bool {
c.once.Do(func() { _, exists := s.writers[name]
c.WriteCloser.Close() return exists
})
return nil
} }

View file

@ -1 +0,0 @@
package stream

52
stream/writer.go Normal file
View file

@ -0,0 +1,52 @@
package stream
import (
"bytes"
"io"
"sync"
"sync/atomic"
)
type writer struct {
sync.RWMutex
*sync.Cond
buffer bytes.Buffer
closed uint32
}
func newWriter() *writer {
var w writer
w.Cond = sync.NewCond(w.RWMutex.RLocker())
return &w
}
func (w *writer) Write(p []byte) (n int, err error) {
defer w.Broadcast()
w.Lock()
defer w.Unlock()
if w.Closed() {
return 0, io.EOF
}
return w.buffer.Write(p)
}
func (w *writer) Reader() (io.Reader, error) {
return &reader{w: w}, nil
}
func (w *writer) Wait() {
if !w.Closed() {
w.Cond.Wait()
}
}
func (w *writer) Close() error {
atomic.StoreUint32(&w.closed, 1)
w.Cond.Broadcast()
return nil
}
func (w *writer) Closed() bool {
return atomic.LoadUint32(&w.closed) == 1
}

View file

@ -91,24 +91,15 @@ func GetStream2(c *gin.Context) {
return return
} }
rc, wc, err := stream.Open(c, stream.ToKey(job.ID)) rc, err := stream.Reader(c, stream.ToKey(job.ID))
if err != nil { if err != nil {
c.AbortWithError(404, err) c.AbortWithError(404, err)
return return
} }
defer func() {
if wc != nil {
wc.Close()
}
if rc != nil {
rc.Close()
}
}()
go func() { go func() {
<-c.Writer.CloseNotify() <-c.Writer.CloseNotify()
rc.Close() // rc.Close()
}() }()
var line int var line int
@ -125,4 +116,6 @@ func GetStream2(c *gin.Context) {
} }
c.Writer.Flush() c.Writer.Flush()
} }
log.Debugf("Closed stream %s#%d", repo.FullName, build.Number)
} }