diff --git a/Dockerfile b/Dockerfile index 80cc69666..ea484565d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,4 +21,9 @@ ENV DATABASE_CONFIG=/var/lib/drone/drone.sqlite ADD drone_static /drone_static +# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf, +# but Go and CGO rely on /etc/nsswitch.conf to check the order of DNS resolving. +# To fix this we just create /etc/nsswitch.conf and add the following line: +#RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf + ENTRYPOINT ["/drone_static"] diff --git a/bus/bus.go b/bus/bus.go new file mode 100644 index 000000000..7bb39c534 --- /dev/null +++ b/bus/bus.go @@ -0,0 +1,40 @@ +package bus + +//go:generate mockery -name Bus -output mock -case=underscore + +import "golang.org/x/net/context" + +// Bus represents an event bus implementation that +// allows a publisher to broadcast Event notifications +// to a list of subscribers. +type Bus interface { + // Publish broadcasts an event to all subscribers. + Publish(*Event) + + // Subscribe adds the channel to the list of + // subscribers. Each subscriber in the list will + // receive broadcast events. + Subscribe(chan *Event) + + // Unsubscribe removes the channel from the list + // of subscribers. + Unsubscribe(chan *Event) +} + +// Publish broadcasts an event to all subscribers. +func Publish(c context.Context, event *Event) { + FromContext(c).Publish(event) +} + +// Subscribe adds the channel to the list of +// subscribers. Each subscriber in the list will +// receive broadcast events. +func Subscribe(c context.Context, eventc chan *Event) { + FromContext(c).Subscribe(eventc) +} + +// Unsubscribe removes the channel from the +// list of subscribers. +func Unsubscribe(c context.Context, eventc chan *Event) { + FromContext(c).Unsubscribe(eventc) +} diff --git a/bus/bus_impl.go b/bus/bus_impl.go new file mode 100644 index 000000000..d0f0e6a64 --- /dev/null +++ b/bus/bus_impl.go @@ -0,0 +1,46 @@ +package bus + +import ( + "sync" +) + +type eventbus struct { + sync.Mutex + subs map[chan *Event]bool +} + +// New creates a simple event bus that manages a list of +// subscribers to which events are published. +func New() Bus { + return newEventbus() +} + +func newEventbus() *eventbus { + return &eventbus{ + subs: make(map[chan *Event]bool), + } +} + +func (b *eventbus) Subscribe(c chan *Event) { + b.Lock() + b.subs[c] = true + b.Unlock() +} + +func (b *eventbus) Unsubscribe(c chan *Event) { + b.Lock() + delete(b.subs, c) + b.Unlock() +} + +func (b *eventbus) Publish(event *Event) { + b.Lock() + defer b.Unlock() + + for s := range b.subs { + go func(c chan *Event) { + defer recover() + c <- event + }(s) + } +} diff --git a/bus/bus_impl_test.go b/bus/bus_impl_test.go new file mode 100644 index 000000000..ffcb1e563 --- /dev/null +++ b/bus/bus_impl_test.go @@ -0,0 +1,73 @@ +package bus + +import ( + "sync" + "testing" + + "github.com/drone/drone/model" + . "github.com/franela/goblin" + "github.com/gin-gonic/gin" +) + +func TestBus(t *testing.T) { + g := Goblin(t) + g.Describe("Event bus", func() { + + g.It("Should unsubscribe", func() { + c := new(gin.Context) + b := newEventbus() + ToContext(c, b) + + c1 := make(chan *Event) + c2 := make(chan *Event) + Subscribe(c, c1) + Subscribe(c, c2) + + g.Assert(len(b.subs)).Equal(2) + }) + + g.It("Should subscribe", func() { + c := new(gin.Context) + b := newEventbus() + ToContext(c, b) + + c1 := make(chan *Event) + c2 := make(chan *Event) + Subscribe(c, c1) + Subscribe(c, c2) + + g.Assert(len(b.subs)).Equal(2) + + Unsubscribe(c, c1) + Unsubscribe(c, c2) + + g.Assert(len(b.subs)).Equal(0) + }) + + g.It("Should publish", func() { + c := new(gin.Context) + b := New() + ToContext(c, b) + + e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) + e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) + c1 := make(chan *Event) + + Subscribe(c, c1) + + var wg sync.WaitGroup + wg.Add(1) + + var r1, r2 *Event + go func() { + r1 = <-c1 + r2 = <-c1 + wg.Done() + }() + Publish(c, e1) + Publish(c, e2) + wg.Wait() + }) + }) + +} diff --git a/bus/context.go b/bus/context.go new file mode 100644 index 000000000..4eccfa7f0 --- /dev/null +++ b/bus/context.go @@ -0,0 +1,21 @@ +package bus + +import "golang.org/x/net/context" + +const key = "bus" + +// Setter defines a context that enables setting values. +type Setter interface { + Set(string, interface{}) +} + +// FromContext returns the Bus associated with this context. +func FromContext(c context.Context) Bus { + return c.Value(key).(Bus) +} + +// ToContext adds the Bus to this context if it supports +// the Setter interface. +func ToContext(c Setter, b Bus) { + c.Set(key, b) +} diff --git a/bus/types.go b/bus/types.go new file mode 100644 index 000000000..3dded50b3 --- /dev/null +++ b/bus/types.go @@ -0,0 +1,32 @@ +package bus + +import "github.com/drone/drone/model" + +// EventType defines the possible types of build events. +type EventType string + +const ( + Enqueued EventType = "enqueued" + Started EventType = "started" + Finished EventType = "finished" + Cancelled EventType = "cancelled" +) + +// Event represents a build event. +type Event struct { + Type EventType `json:"type"` + Repo model.Repo `json:"repo"` + Build model.Build `json:"build"` + Job model.Job `json:"job"` +} + +// NewEvent creates a new Event for the build, using copies of +// the build data to avoid possible mutation or race conditions. +func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { + return &Event{ + Type: t, + Repo: *r, + Build: *b, + Job: *j, + } +} diff --git a/queue/context.go b/queue/context.go new file mode 100644 index 000000000..98a78e9ec --- /dev/null +++ b/queue/context.go @@ -0,0 +1,23 @@ +package queue + +import ( + "golang.org/x/net/context" +) + +const key = "queue" + +// Setter defines a context that enables setting values. +type Setter interface { + Set(string, interface{}) +} + +// FromContext returns the Queue associated with this context. +func FromContext(c context.Context) Queue { + return c.Value(key).(Queue) +} + +// ToContext adds the Queue to this context if it supports +// the Setter interface. +func ToContext(c Setter, q Queue) { + c.Set(key, q) +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 000000000..3399d1987 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,67 @@ +package queue + +//go:generate mockery -name Queue -output mock -case=underscore + +import ( + "errors" + + "golang.org/x/net/context" +) + +// ErrNotFound indicates the requested work item does not +// exist in the queue. +var ErrNotFound = errors.New("queue item not found") + +type Queue interface { + // Publish inserts work at the tail of this queue, waiting for + // space to become available if the queue is full. + Publish(*Work) error + + // Remove removes the specified work item from this queue, + // if it is present. + Remove(*Work) error + + // PullClose retrieves and removes the head of this queue, + // waiting if necessary until work becomes available. + Pull() *Work + + // PullClose retrieves and removes the head of this queue, + // waiting if necessary until work becomes available. The + // CloseNotifier should be provided to clone the channel + // if the subscribing client terminates its connection. + PullClose(CloseNotifier) *Work +} + +// Publish inserts work at the tail of this queue, waiting for +// space to become available if the queue is full. +func Publish(c context.Context, w *Work) error { + return FromContext(c).Publish(w) +} + +// Remove removes the specified work item from this queue, +// if it is present. +func Remove(c context.Context, w *Work) error { + return FromContext(c).Remove(w) +} + +// PullClose retrieves and removes the head of this queue, +// waiting if necessary until work becomes available. +func Pull(c context.Context) *Work { + return FromContext(c).Pull() +} + +// PullClose retrieves and removes the head of this queue, +// waiting if necessary until work becomes available. The +// CloseNotifier should be provided to clone the channel +// if the subscribing client terminates its connection. +func PullClose(c context.Context, cn CloseNotifier) *Work { + return FromContext(c).PullClose(cn) +} + +// CloseNotifier defines a datastructure that is capable of notifying +// a subscriber when its connection is closed. +type CloseNotifier interface { + // CloseNotify returns a channel that receives a single value + // when the client connection has gone away. + CloseNotify() <-chan bool +} diff --git a/queue/queue_impl.go b/queue/queue_impl.go new file mode 100644 index 000000000..8882bc24d --- /dev/null +++ b/queue/queue_impl.go @@ -0,0 +1,85 @@ +package queue + +import "sync" + +type queue struct { + sync.Mutex + + items map[*Work]struct{} + itemc chan *Work +} + +func New() Queue { + return newQueue() +} + +func newQueue() *queue { + return &queue{ + items: make(map[*Work]struct{}), + itemc: make(chan *Work, 999), + } +} + +func (q *queue) Publish(work *Work) error { + q.Lock() + q.items[work] = struct{}{} + q.Unlock() + q.itemc <- work + return nil +} + +func (q *queue) Remove(work *Work) error { + q.Lock() + defer q.Unlock() + + _, ok := q.items[work] + if !ok { + return ErrNotFound + } + var items []*Work + + // loop through and drain all items + // from the +drain: + for { + select { + case item := <-q.itemc: + items = append(items, item) + default: + break drain + } + } + + // re-add all items to the queue except + // the item we're trying to remove + for _, item := range items { + if item == work { + delete(q.items, work) + continue + } + q.itemc <- item + } + return nil +} + +func (q *queue) Pull() *Work { + work := <-q.itemc + q.Lock() + delete(q.items, work) + q.Unlock() + return work +} + +func (q *queue) PullClose(cn CloseNotifier) *Work { + for { + select { + case <-cn.CloseNotify(): + return nil + case work := <-q.itemc: + q.Lock() + delete(q.items, work) + q.Unlock() + return work + } + } +} diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go new file mode 100644 index 000000000..778576232 --- /dev/null +++ b/queue/queue_impl_test.go @@ -0,0 +1,93 @@ +package queue + +import ( + "sync" + "testing" + + . "github.com/franela/goblin" + "github.com/gin-gonic/gin" +) + +func TestBuild(t *testing.T) { + g := Goblin(t) + g.Describe("Queue", func() { + + g.It("Should publish item", func() { + c := new(gin.Context) + q := newQueue() + ToContext(c, q) + + w1 := &Work{} + w2 := &Work{} + Publish(c, w1) + Publish(c, w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + }) + + g.It("Should remove item", func() { + c := new(gin.Context) + q := newQueue() + ToContext(c, q) + + w1 := &Work{} + w2 := &Work{} + w3 := &Work{} + Publish(c, w1) + Publish(c, w2) + Publish(c, w3) + Remove(c, w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + + g.Assert(Pull(c)).Equal(w1) + g.Assert(Pull(c)).Equal(w3) + g.Assert(Remove(c, w2)).Equal(ErrNotFound) + }) + + g.It("Should pull item", func() { + c := new(gin.Context) + q := New() + ToContext(c, q) + + cn := new(closeNotifier) + cn.closec = make(chan bool, 1) + w1 := &Work{} + w2 := &Work{} + + Publish(c, w1) + g.Assert(Pull(c)).Equal(w1) + + Publish(c, w2) + g.Assert(PullClose(c, cn)).Equal(w2) + }) + + g.It("Should cancel pulling item", func() { + c := new(gin.Context) + q := New() + ToContext(c, q) + + cn := new(closeNotifier) + cn.closec = make(chan bool, 1) + var wg sync.WaitGroup + go func() { + wg.Add(1) + g.Assert(PullClose(c, cn) == nil).IsTrue() + wg.Done() + }() + go func() { + cn.closec <- true + }() + wg.Wait() + + }) + }) +} + +type closeNotifier struct { + closec chan bool +} + +func (c *closeNotifier) CloseNotify() <-chan bool { + return c.closec +} diff --git a/queue/types.go b/queue/types.go new file mode 100644 index 000000000..1740c7fe9 --- /dev/null +++ b/queue/types.go @@ -0,0 +1,18 @@ +package queue + +import "github.com/drone/drone/model" + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + Yaml string `json:"config"` + YamlEnc string `json:"secret"` + Repo *model.Repo `json:"repo"` + Build *model.Build `json:"build"` + BuildLast *model.Build `json:"build_last"` + Job *model.Job `json:"job"` + Netrc *model.Netrc `json:"netrc"` + Keys *model.Key `json:"keys"` + System *model.System `json:"system"` + User *model.User `json:"user"` +} diff --git a/stream/context.go b/stream/context.go new file mode 100644 index 000000000..9b89accf1 --- /dev/null +++ b/stream/context.go @@ -0,0 +1,21 @@ +package stream + +import "golang.org/x/net/context" + +const key = "stream" + +// Setter defines a context that enables setting values. +type Setter interface { + Set(string, interface{}) +} + +// FromContext returns the Mux associated with this context. +func FromContext(c context.Context) Mux { + return c.Value(key).(Mux) +} + +// ToContext adds the Mux to this context if it supports +// the Setter interface. +func ToContext(c Setter, m Mux) { + c.Set(key, m) +} diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 000000000..787ea48e2 --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,73 @@ +package stream + +//go:generate mockery -name Mux -output mock -case=underscore + +import ( + "bufio" + "io" + "strconv" + + "golang.org/x/net/context" +) + +// Mux defines a stream multiplexer +type Mux interface { + // Create creates and returns a new stream identified by + // the specified key. + Create(key string) (io.ReadCloser, io.WriteCloser, error) + + // Open returns the existing stream by key. If the stream + // does not exist an error is returned. + Open(key string) (io.ReadCloser, io.WriteCloser, error) + + // Remove deletes the stream by key. + Remove(key string) error + + // Exists return true if the stream exists. + Exists(key string) bool +} + +// Create creates and returns a new stream identified +// by the specified key. +func Create(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) { + return FromContext(c).Create(key) +} + +// Open returns the existing stream by key. If the stream does +// not exist an error is returned. +func Open(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) { + return FromContext(c).Open(key) +} + +// Exists return true if the stream exists. +func Exists(c context.Context, key string) bool { + return FromContext(c).Exists(key) +} + +// Remove deletes the stream by key. +func Remove(c context.Context, key string) error { + return FromContext(c).Remove(key) +} + +// ToKey is a helper function that converts a unique identifier +// of type int64 into a string. +func ToKey(i int64) string { + return strconv.FormatInt(i, 10) +} + +// Copy copies the stream from the source to the destination in +// valid JSON format. This converts the logs, which are per-line +// JSON objects, to a JSON array. +func Copy(dest io.Writer, src io.Reader) error { + io.WriteString(dest, "[") + + scanner := bufio.NewScanner(src) + for scanner.Scan() { + io.WriteString(dest, scanner.Text()) + io.WriteString(dest, ",\n") + } + + io.WriteString(dest, "{}]") + + return nil +} diff --git a/stream/stream_impl.go b/stream/stream_impl.go new file mode 100644 index 000000000..a25a3b155 --- /dev/null +++ b/stream/stream_impl.go @@ -0,0 +1,96 @@ +package stream + +import ( + "io" + "sync" + + "github.com/djherbis/fscache" +) + +var noexp fscache.Reaper + +// New creates a new Mux using an in-memory filesystem. +func New() Mux { + fs := fscache.NewMemFs() + c, err := fscache.NewCache(fs, noexp) + if err != nil { + panic(err) + } + return &mux{c} +} + +// New creates a new Mux using a persistent filesystem. +func NewFileSystem(path string) Mux { + fs, err := fscache.NewFs(path, 0777) + if err != nil { + panic(err) + } + c, err := fscache.NewCache(fs, noexp) + if err != nil { + panic(err) + } + return &mux{c} +} + +// mux wraps the default fscache.Cache to match the +// defined interface and to wrap the ReadCloser and +// WriteCloser to avoid panics when we over-aggressively +// close streams. +type mux struct { + cache fscache.Cache +} + +func (m *mux) Create(key string) (io.ReadCloser, io.WriteCloser, error) { + rc, wc, err := m.cache.Get(key) + if rc != nil { + rc = &closeOnceReader{ReaderAt: rc, ReadCloser: rc} + } + if wc != nil { + wc = &closeOnceWriter{WriteCloser: wc} + } + return rc, wc, err +} + +func (m *mux) Open(key string) (io.ReadCloser, io.WriteCloser, error) { + return m.Create(key) +} + +func (m *mux) Exists(key string) bool { + return m.cache.Exists(key) +} +func (m *mux) Remove(key string) error { + return m.cache.Remove(key) +} + +// closeOnceReader is a helper function that ensures +// the reader is only closed once. This is because +// attempting to close the fscache reader more than +// once results in a panic. +type closeOnceReader struct { + io.ReaderAt + io.ReadCloser + once sync.Once +} + +func (c *closeOnceReader) Close() error { + c.once.Do(func() { + c.ReadCloser.Close() + }) + return nil +} + +// closeOnceWriter is a helper function that ensures +// the writer is only closed once. This is because +// attempting to close the fscache writer more than +// once results in a panic. +type closeOnceWriter struct { + io.WriteCloser + once sync.Once +} + +func (c *closeOnceWriter) Close() error { + c.once.Do(func() { + c.WriteCloser.Close() + }) + return nil +} diff --git a/stream/stream_impl_test.go b/stream/stream_impl_test.go new file mode 100644 index 000000000..11541cc2b --- /dev/null +++ b/stream/stream_impl_test.go @@ -0,0 +1 @@ +package stream diff --git a/vendor/github.com/djherbis/fscache/LICENSE b/vendor/github.com/djherbis/fscache/LICENSE new file mode 100644 index 000000000..1e7b7cc09 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Dustin H + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/djherbis/fscache/README.md b/vendor/github.com/djherbis/fscache/README.md new file mode 100644 index 000000000..bae10838e --- /dev/null +++ b/vendor/github.com/djherbis/fscache/README.md @@ -0,0 +1,93 @@ +fscache +========== + +[![GoDoc](https://godoc.org/github.com/djherbis/fscache?status.svg)](https://godoc.org/github.com/djherbis/fscache) +[![Release](https://img.shields.io/github/release/djherbis/fscache.svg)](https://github.com/djherbis/fscache/releases/latest) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt) +[![Build Status](https://travis-ci.org/djherbis/fscache.svg?branch=master)](https://travis-ci.org/djherbis/fscache) +[![Coverage Status](https://coveralls.io/repos/djherbis/fscache/badge.svg?branch=master)](https://coveralls.io/r/djherbis/fscache?branch=master) +[![Go Report Card](https://goreportcard.com/badge/github.com/djherbis/fscache)](https://goreportcard.com/report/github.com/djherbis/fscache) + +Usage +------------ +Streaming File Cache for #golang + +fscache allows multiple readers to read from a cache while its being written to. [blog post](https://djherbis.github.io/post/fscache/) + +Using the Cache directly: + +```go +package main + +import ( + "io" + "log" + "os" + "time" + + "gopkg.in/djherbis/fscache.v0" +) + +func main() { + + // create the cache, keys expire after 1 hour. + c, err := fscache.New("./cache", 0755, time.Hour) + if err != nil { + log.Fatal(err.Error()) + } + + // wipe the cache when done + defer c.Clean() + + // Get() and it's streams can be called concurrently but just for example: + for i := 0; i < 3; i++ { + r, w, err := c.Get("stream") + if err != nil { + log.Fatal(err.Error()) + } + + if w != nil { // a new stream, write to it. + go func(){ + w.Write([]byte("hello world\n")) + w.Close() + }() + } + + // the stream has started, read from it + io.Copy(os.Stdout, r) + r.Close() + } +} +``` + +A Caching Middle-ware: + +```go +package main + +import( + "net/http" + "time" + + "gopkg.in/djherbis/fscache.v0" +) + +func main(){ + c, err := fscache.New("./cache", 0700, 0) + if err != nil { + log.Fatal(err.Error()) + } + + handler := func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "%v: %s", time.Now(), "hello world") + } + + http.ListenAndServe(":8080", fscache.Handler(c, http.HandlerFunc(handler))) +} +``` + +Installation +------------ +```sh +go get gopkg.in/djherbis/fscache.v0 +``` diff --git a/vendor/github.com/djherbis/fscache/distrib.go b/vendor/github.com/djherbis/fscache/distrib.go new file mode 100644 index 000000000..60994cc58 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/distrib.go @@ -0,0 +1,85 @@ +package fscache + +import ( + "bytes" + "crypto/sha1" + "encoding/binary" + "io" +) + +// Distributor provides a way to partition keys into Caches. +type Distributor interface { + + // GetCache will always return the same Cache for the same key. + GetCache(key string) Cache + + // Clean should wipe all the caches this Distributor manages + Clean() error +} + +// stdDistribution distributes the keyspace evenly. +func stdDistribution(key string, n uint64) uint64 { + h := sha1.New() + io.WriteString(h, key) + buf := bytes.NewBuffer(h.Sum(nil)[:8]) + i, _ := binary.ReadUvarint(buf) + return i % n +} + +// NewDistributor returns a Distributor which evenly distributes the keyspace +// into the passed caches. +func NewDistributor(caches ...Cache) Distributor { + if len(caches) == 0 { + return nil + } + return &distrib{ + distribution: stdDistribution, + caches: caches, + size: uint64(len(caches)), + } +} + +type distrib struct { + distribution func(key string, n uint64) uint64 + caches []Cache + size uint64 +} + +func (d *distrib) GetCache(key string) Cache { + return d.caches[d.distribution(key, d.size)] +} + +// BUG(djherbis): Return an error if cleaning fails +func (d *distrib) Clean() error { + for _, c := range d.caches { + c.Clean() + } + return nil +} + +// NewPartition returns a Cache which uses the Caches defined by the passed Distributor. +func NewPartition(d Distributor) Cache { + return &partition{ + distributor: d, + } +} + +type partition struct { + distributor Distributor +} + +func (p *partition) Get(key string) (ReadAtCloser, io.WriteCloser, error) { + return p.distributor.GetCache(key).Get(key) +} + +func (p *partition) Remove(key string) error { + return p.distributor.GetCache(key).Remove(key) +} + +func (p *partition) Exists(key string) bool { + return p.distributor.GetCache(key).Exists(key) +} + +func (p *partition) Clean() error { + return p.distributor.Clean() +} diff --git a/vendor/github.com/djherbis/fscache/fs.go b/vendor/github.com/djherbis/fscache/fs.go new file mode 100644 index 000000000..91aaae347 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/fs.go @@ -0,0 +1,199 @@ +package fscache + +import ( + "bytes" + "crypto/md5" + "crypto/rand" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" + + "gopkg.in/djherbis/atime.v1" + "gopkg.in/djherbis/stream.v1" +) + +// FileSystem is used as the source for a Cache. +type FileSystem interface { + // Stream FileSystem + stream.FileSystem + + // Reload should look through the FileSystem and call the suplied fn + // with the key/filename pairs that are found. + Reload(func(key, name string)) error + + // RemoveAll should empty the FileSystem of all files. + RemoveAll() error + + // AccessTimes takes a File.Name() and returns the last time the file was read, + // and the last time it was written to. + // It will be used to check expiry of a file, and must be concurrent safe + // with modifications to the FileSystem (writes, reads etc.) + AccessTimes(name string) (rt, wt time.Time, err error) +} + +type stdFs struct { + root string +} + +// NewFs returns a FileSystem rooted at directory dir. +// Dir is created with perms if it doesn't exist. +func NewFs(dir string, mode os.FileMode) (FileSystem, error) { + return &stdFs{root: dir}, os.MkdirAll(dir, mode) +} + +func (fs *stdFs) Reload(add func(key, name string)) error { + files, err := ioutil.ReadDir(fs.root) + if err != nil { + return err + } + + addfiles := make(map[string]struct { + os.FileInfo + key string + }) + + for _, f := range files { + + if strings.HasSuffix(f.Name(), ".key") { + continue + } + + key, err := fs.getKey(f.Name()) + if err != nil { + return err + } + fi, ok := addfiles[key] + + if !ok || fi.ModTime().Before(f.ModTime()) { + if ok { + fs.Remove(fi.Name()) + } + addfiles[key] = struct { + os.FileInfo + key string + }{ + FileInfo: f, + key: key, + } + } else { + fs.Remove(f.Name()) + } + + } + + for _, f := range addfiles { + path, err := filepath.Abs(filepath.Join(fs.root, f.Name())) + if err != nil { + return err + } + add(f.key, path) + } + + return nil +} + +func (fs *stdFs) Create(name string) (stream.File, error) { + name, err := fs.makeName(name) + if err != nil { + return nil, err + } + return fs.create(name) +} + +func (fs *stdFs) create(name string) (stream.File, error) { + return os.OpenFile(filepath.Join(fs.root, name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +} + +func (fs *stdFs) Open(name string) (stream.File, error) { + return os.Open(name) +} + +func (fs *stdFs) Remove(name string) error { + os.Remove(fmt.Sprintf("%s.key", name)) + return os.Remove(name) +} + +func (fs *stdFs) RemoveAll() error { + return os.RemoveAll(fs.root) +} + +func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) { + fi, err := os.Stat(name) + if err != nil { + return rt, wt, err + } + return atime.Get(fi), fi.ModTime(), nil +} + +const ( + saltSize = 8 + maxShort = 20 + shortPrefix = "s" + longPrefix = "l" +) + +func salt() string { + buf := bytes.NewBufferString("") + enc := base64.NewEncoder(base64.URLEncoding, buf) + io.CopyN(enc, rand.Reader, saltSize) + return buf.String() +} + +func tob64(s string) string { + buf := bytes.NewBufferString("") + enc := base64.NewEncoder(base64.URLEncoding, buf) + enc.Write([]byte(s)) + enc.Close() + return buf.String() +} + +func fromb64(s string) string { + buf := bytes.NewBufferString(s) + dec := base64.NewDecoder(base64.URLEncoding, buf) + out := bytes.NewBufferString("") + io.Copy(out, dec) + return out.String() +} + +func (fs *stdFs) makeName(key string) (string, error) { + b64key := tob64(key) + // short name + if len(b64key) < maxShort { + return fmt.Sprintf("%s%s%s", shortPrefix, salt(), b64key), nil + } + + // long name + hash := md5.Sum([]byte(key)) + name := fmt.Sprintf("%s%s%x", longPrefix, salt(), hash[:]) + f, err := fs.create(fmt.Sprintf("%s.key", name)) + if err != nil { + return "", err + } + _, err = f.Write([]byte(key)) + f.Close() + return name, err +} + +func (fs *stdFs) getKey(name string) (string, error) { + // short name + if strings.HasPrefix(name, shortPrefix) { + return fromb64(strings.TrimPrefix(name, shortPrefix)[saltSize:]), nil + } + + // long name + f, err := fs.Open(filepath.Join(fs.root, fmt.Sprintf("%s.key", name))) + if err != nil { + return "", err + } + defer f.Close() + key, err := ioutil.ReadAll(f) + if err != nil { + return "", err + } + return string(key), nil +} diff --git a/vendor/github.com/djherbis/fscache/fscache.go b/vendor/github.com/djherbis/fscache/fscache.go new file mode 100644 index 000000000..2d8acb1d4 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/fscache.go @@ -0,0 +1,303 @@ +package fscache + +import ( + "io" + "os" + "sync" + "sync/atomic" + "time" + + "gopkg.in/djherbis/stream.v1" +) + +// Cache works like a concurrent-safe map for streams. +type Cache interface { + + // Get manages access to the streams in the cache. + // If the key does not exist, w != nil and you can start writing to the stream. + // If the key does exist, w == nil. + // r will always be non-nil as long as err == nil and you must close r when you're done reading. + // Get can be called concurrently, and writing and reading is concurrent safe. + Get(key string) (ReadAtCloser, io.WriteCloser, error) + + // Remove deletes the stream from the cache, blocking until the underlying + // file can be deleted (all active streams finish with it). + // It is safe to call Remove concurrently with Get. + Remove(key string) error + + // Exists checks if a key is in the cache. + // It is safe to call Exists concurrently with Get. + Exists(key string) bool + + // Clean will empty the cache and delete the cache folder. + // Clean is not safe to call while streams are being read/written. + Clean() error +} + +type cache struct { + mu sync.RWMutex + files map[string]fileStream + grim Reaper + fs FileSystem +} + +// ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range +// Requests are possible. +type ReadAtCloser interface { + io.ReadCloser + io.ReaderAt +} + +type fileStream interface { + next() (ReadAtCloser, error) + inUse() bool + io.WriteCloser + Remove() error + Name() string +} + +// New creates a new Cache using NewFs(dir, perms). +// expiry is the duration after which an un-accessed key will be removed from +// the cache, a zero value expiro means never expire. +func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) { + fs, err := NewFs(dir, perms) + if err != nil { + return nil, err + } + var grim Reaper + if expiry > 0 { + grim = &reaper{ + expiry: expiry, + period: expiry, + } + } + return NewCache(fs, grim) +} + +// NewCache creates a new Cache based on FileSystem fs. +// fs.Files() are loaded using the name they were created with as a key. +// Reaper is used to determine when files expire, nil means never expire. +func NewCache(fs FileSystem, grim Reaper) (Cache, error) { + c := &cache{ + files: make(map[string]fileStream), + grim: grim, + fs: fs, + } + err := c.load() + if err != nil { + return nil, err + } + if grim != nil { + c.haunter() + } + return c, nil +} + +func (c *cache) haunter() { + c.haunt() + time.AfterFunc(c.grim.Next(), c.haunter) +} + +func (c *cache) haunt() { + c.mu.Lock() + defer c.mu.Unlock() + + for key, f := range c.files { + if f.inUse() { + continue + } + + lastRead, lastWrite, err := c.fs.AccessTimes(f.Name()) + if err != nil { + continue + } + + if c.grim.Reap(key, lastRead, lastWrite) { + delete(c.files, key) + c.fs.Remove(f.Name()) + } + } + return +} + +func (c *cache) load() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.fs.Reload(func(key, name string) { + c.files[key] = c.oldFile(name) + }) +} + +func (c *cache) Exists(key string) bool { + c.mu.RLock() + defer c.mu.RUnlock() + _, ok := c.files[key] + return ok +} + +func (c *cache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) { + c.mu.RLock() + f, ok := c.files[key] + if ok { + r, err = f.next() + c.mu.RUnlock() + return r, nil, err + } + c.mu.RUnlock() + + c.mu.Lock() + defer c.mu.Unlock() + + f, ok = c.files[key] + if ok { + r, err = f.next() + return r, nil, err + } + + f, err = c.newFile(key) + if err != nil { + return nil, nil, err + } + + r, err = f.next() + if err != nil { + f.Close() + c.fs.Remove(f.Name()) + return nil, nil, err + } + + c.files[key] = f + + return r, f, err +} + +func (c *cache) Remove(key string) error { + c.mu.Lock() + f, ok := c.files[key] + delete(c.files, key) + c.mu.Unlock() + + if ok { + return f.Remove() + } + return nil +} + +func (c *cache) Clean() error { + c.mu.Lock() + defer c.mu.Unlock() + c.files = make(map[string]fileStream) + return c.fs.RemoveAll() +} + +type cachedFile struct { + stream *stream.Stream + handleCounter +} + +func (c *cache) newFile(name string) (fileStream, error) { + s, err := stream.NewStream(name, c.fs) + if err != nil { + return nil, err + } + cf := &cachedFile{ + stream: s, + } + cf.inc() + return cf, nil +} + +func (c *cache) oldFile(name string) fileStream { + return &reloadedFile{ + fs: c.fs, + name: name, + } +} + +type reloadedFile struct { + fs FileSystem + name string + handleCounter + io.WriteCloser // nop Write & Close methods. will never be called. +} + +func (f *reloadedFile) Name() string { return f.name } + +func (f *reloadedFile) Remove() error { + f.waitUntilFree() + return f.fs.Remove(f.name) +} + +func (f *reloadedFile) next() (r ReadAtCloser, err error) { + r, err = f.fs.Open(f.name) + if err == nil { + f.inc() + } + return &cacheReader{r: r, cnt: &f.handleCounter}, err +} + +func (f *cachedFile) Name() string { return f.stream.Name() } + +func (f *cachedFile) Remove() error { return f.stream.Remove() } + +func (f *cachedFile) next() (r ReadAtCloser, err error) { + reader, err := f.stream.NextReader() + if err != nil { + return nil, err + } + f.inc() + return &cacheReader{ + r: reader, + cnt: &f.handleCounter, + }, nil +} + +func (f *cachedFile) Write(p []byte) (int, error) { + return f.stream.Write(p) +} + +func (f *cachedFile) Close() error { + defer f.dec() + return f.stream.Close() +} + +type cacheReader struct { + r ReadAtCloser + cnt *handleCounter +} + +func (r *cacheReader) ReadAt(p []byte, off int64) (n int, err error) { + return r.r.ReadAt(p, off) +} + +func (r *cacheReader) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +func (r *cacheReader) Close() error { + defer r.cnt.dec() + return r.r.Close() +} + +type handleCounter struct { + cnt int64 + grp sync.WaitGroup +} + +func (h *handleCounter) inc() { + h.grp.Add(1) + atomic.AddInt64(&h.cnt, 1) +} + +func (h *handleCounter) dec() { + atomic.AddInt64(&h.cnt, -1) + h.grp.Done() +} + +func (h *handleCounter) inUse() bool { + return atomic.LoadInt64(&h.cnt) > 0 +} + +func (h *handleCounter) waitUntilFree() { + h.grp.Wait() +} diff --git a/vendor/github.com/djherbis/fscache/handler.go b/vendor/github.com/djherbis/fscache/handler.go new file mode 100644 index 000000000..8df85400c --- /dev/null +++ b/vendor/github.com/djherbis/fscache/handler.go @@ -0,0 +1,41 @@ +package fscache + +import ( + "io" + "net/http" +) + +// Handler is a caching middle-ware for http Handlers. +// It responds to http requests via the passed http.Handler, and caches the response +// using the passed cache. The cache key for the request is the req.URL.String(). +// Note: It does not cache http headers. It is more efficient to set them yourself. +func Handler(c Cache, h http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + url := req.URL.String() + r, w, err := c.Get(url) + if err != nil { + h.ServeHTTP(rw, req) + return + } + defer r.Close() + if w != nil { + go func() { + defer w.Close() + h.ServeHTTP(&respWrapper{ + ResponseWriter: rw, + Writer: w, + }, req) + }() + } + io.Copy(rw, r) + }) +} + +type respWrapper struct { + http.ResponseWriter + io.Writer +} + +func (r *respWrapper) Write(p []byte) (int, error) { + return r.Writer.Write(p) +} diff --git a/vendor/github.com/djherbis/fscache/layers.go b/vendor/github.com/djherbis/fscache/layers.go new file mode 100644 index 000000000..74a53d07f --- /dev/null +++ b/vendor/github.com/djherbis/fscache/layers.go @@ -0,0 +1,129 @@ +package fscache + +import ( + "errors" + "io" + "sync" +) + +type layeredCache struct { + layers []Cache +} + +// NewLayered returns a Cache which stores its data in all the passed +// caches, when a key is requested it is loaded into all the caches above the first hit. +func NewLayered(caches ...Cache) Cache { + return &layeredCache{layers: caches} +} + +func (l *layeredCache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) { + var last ReadAtCloser + var writers []io.WriteCloser + + for i, layer := range l.layers { + r, w, err = layer.Get(key) + if err != nil { + if len(writers) > 0 { + last.Close() + multiWC(writers...).Close() + } + return nil, nil, err + } + + // hit + if w == nil { + if len(writers) > 0 { + go func(r io.ReadCloser) { + wc := multiWC(writers...) + defer r.Close() + defer wc.Close() + io.Copy(wc, r) + }(r) + return last, nil, nil + } + return r, nil, nil + } + + // miss + writers = append(writers, w) + + if i == len(l.layers)-1 { + if last != nil { + last.Close() + } + return r, multiWC(writers...), nil + } + + if last != nil { + last.Close() + } + last = r + } + + return nil, nil, errors.New("no caches") +} + +func (l *layeredCache) Remove(key string) error { + var grp sync.WaitGroup + // walk upwards so that lower layers don't + // restore upper layers on Get() + for i := len(l.layers) - 1; i >= 0; i-- { + grp.Add(1) + go func(layer Cache) { + defer grp.Done() + layer.Remove(key) + }(l.layers[i]) + } + grp.Wait() + return nil +} + +func (l *layeredCache) Exists(key string) bool { + for _, layer := range l.layers { + if layer.Exists(key) { + return true + } + } + return false +} + +func (l *layeredCache) Clean() (err error) { + for _, layer := range l.layers { + er := layer.Clean() + if er != nil { + err = er + } + } + return nil +} + +func multiWC(wc ...io.WriteCloser) io.WriteCloser { + if len(wc) == 0 { + return nil + } + + return &multiWriteCloser{ + writers: wc, + } +} + +type multiWriteCloser struct { + writers []io.WriteCloser +} + +func (t *multiWriteCloser) Write(p []byte) (n int, err error) { + for _, w := range t.writers { + n, err = w.Write(p) + if err != nil { + return + } + } + return len(p), nil +} + +func (t *multiWriteCloser) Close() error { + for _, w := range t.writers { + w.Close() + } + return nil +} diff --git a/vendor/github.com/djherbis/fscache/memfs.go b/vendor/github.com/djherbis/fscache/memfs.go new file mode 100644 index 000000000..cfe7e0def --- /dev/null +++ b/vendor/github.com/djherbis/fscache/memfs.go @@ -0,0 +1,133 @@ +package fscache + +import ( + "bytes" + "errors" + "io" + "sync" + "time" + + "gopkg.in/djherbis/stream.v1" +) + +type memFS struct { + mu sync.RWMutex + files map[string]*memFile +} + +// NewMemFs creates an in-memory FileSystem. +// It does not support persistence (Reload is a nop). +func NewMemFs() FileSystem { + return &memFS{ + files: make(map[string]*memFile), + } +} + +func (fs *memFS) Reload(add func(key, name string)) error { + return nil +} + +func (fs *memFS) AccessTimes(name string) (rt, wt time.Time, err error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + f, ok := fs.files[name] + if ok { + return f.rt, f.wt, nil + } + return rt, wt, errors.New("file has not been read") +} + +func (fs *memFS) Create(key string) (stream.File, error) { + fs.mu.Lock() + defer fs.mu.Unlock() + if _, ok := fs.files[key]; ok { + return nil, errors.New("file exists") + } + file := &memFile{ + name: key, + r: bytes.NewBuffer(nil), + wt: time.Now(), + } + file.memReader.memFile = file + fs.files[key] = file + return file, nil +} + +func (fs *memFS) Open(name string) (stream.File, error) { + fs.mu.Lock() + defer fs.mu.Unlock() + if f, ok := fs.files[name]; ok { + f.rt = time.Now() + return &memReader{memFile: f}, nil + } + return nil, errors.New("file does not exist") +} + +func (fs *memFS) Remove(key string) error { + fs.mu.Lock() + defer fs.mu.Unlock() + delete(fs.files, key) + return nil +} + +func (fs *memFS) RemoveAll() error { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.files = make(map[string]*memFile) + return nil +} + +type memFile struct { + mu sync.RWMutex + name string + r *bytes.Buffer + memReader + rt, wt time.Time +} + +func (f *memFile) Name() string { + return f.name +} + +func (f *memFile) Write(p []byte) (int, error) { + if len(p) > 0 { + f.mu.Lock() + defer f.mu.Unlock() + return f.r.Write(p) + } + return len(p), nil +} + +func (f *memFile) Bytes() []byte { + f.mu.RLock() + defer f.mu.RUnlock() + return f.r.Bytes() +} + +func (f *memFile) Close() error { + return nil +} + +type memReader struct { + *memFile + n int +} + +func (r *memReader) ReadAt(p []byte, off int64) (n int, err error) { + data := r.Bytes() + if int64(len(data)) < off { + return 0, io.EOF + } + n, err = bytes.NewReader(data[off:]).ReadAt(p, 0) + return n, err +} + +func (r *memReader) Read(p []byte) (n int, err error) { + n, err = bytes.NewReader(r.Bytes()[r.n:]).Read(p) + r.n += n + return n, err +} + +func (r *memReader) Close() error { + return nil +} diff --git a/vendor/github.com/djherbis/fscache/reaper.go b/vendor/github.com/djherbis/fscache/reaper.go new file mode 100644 index 000000000..601e02cb4 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/reaper.go @@ -0,0 +1,37 @@ +package fscache + +import "time" + +// Reaper is used to control when streams expire from the cache. +// It is called once right after loading, and then it is run +// again after every Next() period of time. +type Reaper interface { + // Returns the amount of time to wait before the next scheduled Reaping. + Next() time.Duration + + // Given a key and the last r/w times of a file, return true + // to remove the file from the cache, false to keep it. + Reap(key string, lastRead, lastWrite time.Time) bool +} + +// NewReaper returns a simple reaper which runs every "period" +// and reaps files which are older than "expiry". +func NewReaper(expiry, period time.Duration) Reaper { + return &reaper{ + expiry: expiry, + period: period, + } +} + +type reaper struct { + period time.Duration + expiry time.Duration +} + +func (g *reaper) Next() time.Duration { + return g.period +} + +func (g *reaper) Reap(key string, lastRead, lastWrite time.Time) bool { + return lastRead.Before(time.Now().Add(-g.expiry)) +} diff --git a/vendor/github.com/djherbis/fscache/server.go b/vendor/github.com/djherbis/fscache/server.go new file mode 100644 index 000000000..dba74aad3 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/server.go @@ -0,0 +1,206 @@ +package fscache + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" +) + +// ListenAndServe hosts a Cache for access via NewRemote +func ListenAndServe(c Cache, addr string) error { + return (&server{c: c}).ListenAndServe(addr) +} + +// NewRemote returns a Cache run via ListenAndServe +func NewRemote(raddr string) Cache { + return &remote{raddr: raddr} +} + +type server struct { + c Cache +} + +func (s *server) ListenAndServe(addr string) error { + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + for { + c, err := l.Accept() + if err != nil { + return err + } + + go s.Serve(c) + } +} + +const ( + actionGet = iota + actionRemove = iota + actionExists = iota + actionClean = iota +) + +func getKey(r io.Reader) string { + dec := newDecoder(r) + buf := bytes.NewBufferString("") + io.Copy(buf, dec) + return buf.String() +} + +func sendKey(w io.Writer, key string) { + enc := newEncoder(w) + enc.Write([]byte(key)) + enc.Close() +} + +func (s *server) Serve(c net.Conn) { + var action int + fmt.Fscanf(c, "%d\n", &action) + + switch action { + case actionGet: + s.get(c, getKey(c)) + case actionRemove: + s.c.Remove(getKey(c)) + case actionExists: + s.exists(c, getKey(c)) + case actionClean: + s.c.Clean() + } +} + +func (s *server) exists(c net.Conn, key string) { + if s.c.Exists(key) { + fmt.Fprintf(c, "%d\n", 1) + } else { + fmt.Fprintf(c, "%d\n", 0) + } +} + +func (s *server) get(c net.Conn, key string) { + r, w, err := s.c.Get(key) + if err != nil { + return // handle this better + } + defer r.Close() + + if w != nil { + go func() { + fmt.Fprintf(c, "%d\n", 1) + io.Copy(w, newDecoder(c)) + w.Close() + }() + } else { + fmt.Fprintf(c, "%d\n", 0) + } + + enc := newEncoder(c) + io.Copy(enc, r) + enc.Close() +} + +type remote struct { + raddr string +} + +func (rmt *remote) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) { + c, err := net.Dial("tcp", rmt.raddr) + if err != nil { + return nil, nil, err + } + fmt.Fprintf(c, "%d\n", actionGet) + sendKey(c, key) + + var i int + fmt.Fscanf(c, "%d\n", &i) + + var ch chan struct{} + + switch i { + case 0: + ch = make(chan struct{}) // close net.Conn on reader close + case 1: + ch = make(chan struct{}, 1) // two closes before net.Conn close + + w = &safeCloser{ + c: c, + ch: ch, + w: newEncoder(c), + } + default: + return nil, nil, errors.New("bad bad bad") + } + + r = &safeCloser{ + c: c, + ch: ch, + r: newDecoder(c), + } + + return r, w, nil +} + +type safeCloser struct { + c net.Conn + ch chan<- struct{} + r ReadAtCloser + w io.WriteCloser +} + +func (s *safeCloser) ReadAt(p []byte, off int64) (int, error) { + return s.r.ReadAt(p, off) +} +func (s *safeCloser) Read(p []byte) (int, error) { return s.r.Read(p) } +func (s *safeCloser) Write(p []byte) (int, error) { return s.w.Write(p) } + +// Close only closes the underlying connection when ch is full. +func (s *safeCloser) Close() (err error) { + if s.r != nil { + err = s.r.Close() + } else if s.w != nil { + err = s.w.Close() + } + + select { + case s.ch <- struct{}{}: + return err + default: + return s.c.Close() + } +} + +func (rmt *remote) Exists(key string) bool { + c, err := net.Dial("tcp", rmt.raddr) + if err != nil { + return false + } + fmt.Fprintf(c, "%d\n", actionExists) + sendKey(c, key) + var i int + fmt.Fscanf(c, "%d\n", &i) + return i == 1 +} + +func (rmt *remote) Remove(key string) error { + c, err := net.Dial("tcp", rmt.raddr) + if err != nil { + return err + } + fmt.Fprintf(c, "%d\n", actionRemove) + sendKey(c, key) + return nil +} + +func (rmt *remote) Clean() error { + c, err := net.Dial("tcp", rmt.raddr) + if err != nil { + return err + } + fmt.Fprintf(c, "%d\n", actionClean) + return nil +} diff --git a/vendor/github.com/djherbis/fscache/stream.go b/vendor/github.com/djherbis/fscache/stream.go new file mode 100644 index 000000000..9cccb2483 --- /dev/null +++ b/vendor/github.com/djherbis/fscache/stream.go @@ -0,0 +1,72 @@ +package fscache + +import ( + "encoding/json" + "errors" + "io" +) + +type decoder interface { + Decode(interface{}) error +} + +type encoder interface { + Encode(interface{}) error +} + +type pktReader struct { + dec decoder +} + +type pktWriter struct { + enc encoder +} + +type packet struct { + Err int + Data []byte +} + +const eof = 1 + +func (t *pktReader) ReadAt(p []byte, off int64) (n int, err error) { + // TODO not implemented + return 0, errors.New("not implemented") +} + +func (t *pktReader) Read(p []byte) (int, error) { + var pkt packet + err := t.dec.Decode(&pkt) + if err != nil { + return 0, err + } + if pkt.Err == eof { + return 0, io.EOF + } + return copy(p, pkt.Data), nil +} + +func (t *pktReader) Close() error { + return nil +} + +func (t *pktWriter) Write(p []byte) (int, error) { + pkt := packet{Data: p} + err := t.enc.Encode(pkt) + if err != nil { + return 0, err + } + return len(p), nil +} + +func (t *pktWriter) Close() error { + return t.enc.Encode(packet{Err: eof}) +} + +func newEncoder(w io.Writer) io.WriteCloser { + return &pktWriter{enc: json.NewEncoder(w)} +} + +func newDecoder(r io.Reader) ReadAtCloser { + return &pktReader{dec: json.NewDecoder(r)} +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/LICENSE b/vendor/gopkg.in/djherbis/atime.v1/LICENSE new file mode 100644 index 000000000..1e7b7cc09 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Dustin H + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/gopkg.in/djherbis/atime.v1/README.md b/vendor/gopkg.in/djherbis/atime.v1/README.md new file mode 100644 index 000000000..a96873ca7 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/README.md @@ -0,0 +1,39 @@ +atime +========== + +[![GoDoc](https://godoc.org/github.com/djherbis/atime?status.svg)](https://godoc.org/github.com/djherbis/atime) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt) +[![Build Status](https://travis-ci.org/djherbis/atime.svg?branch=master)](https://travis-ci.org/djherbis/atime) +[![Coverage Status](https://coveralls.io/repos/djherbis/atime/badge.svg?branch=master)](https://coveralls.io/r/djherbis/atime?branch=master) + +Usage +------------ +File Access Times for #golang + +Looking for ctime or btime? Checkout https://github.com/djherbis/times + +Go has a hidden atime function for most platforms, this repo makes it accessible. + +```go +package main + +import ( + "log" + + "github.com/djherbis/atime" +) + +func main() { + at, err := atime.Stat("myfile") + if err != nil { + log.Fatal(err.Error()) + } + log.Println(at) +} +``` + +Installation +------------ +```sh +go get github.com/djherbis/atime +``` diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_darwin.go b/vendor/gopkg.in/djherbis/atime.v1/atime_darwin.go new file mode 100644 index 000000000..ccf7ebc30 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_darwin.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_darwin.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_dragonfly.go b/vendor/gopkg.in/djherbis/atime.v1/atime_dragonfly.go new file mode 100644 index 000000000..cd7619e6c --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_dragonfly.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_dragonfly.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_freebsd.go b/vendor/gopkg.in/djherbis/atime.v1/atime_freebsd.go new file mode 100644 index 000000000..ec7bb8b5d --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_freebsd.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_freebsd.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_linux.go b/vendor/gopkg.in/djherbis/atime.v1/atime_linux.go new file mode 100644 index 000000000..b8827bb3e --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_linux.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_linux.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_nacl.go b/vendor/gopkg.in/djherbis/atime.v1/atime_nacl.go new file mode 100644 index 000000000..ed257513a --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_nacl.go @@ -0,0 +1,22 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_nacl.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(sec, nsec int64) time.Time { + return time.Unix(sec, nsec) +} + +func atime(fi os.FileInfo) time.Time { + st := fi.Sys().(*syscall.Stat_t) + return timespecToTime(st.Atime, st.AtimeNsec) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_netbsd.go b/vendor/gopkg.in/djherbis/atime.v1/atime_netbsd.go new file mode 100644 index 000000000..6919d05a5 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_netbsd.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_netbsd.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_openbsd.go b/vendor/gopkg.in/djherbis/atime.v1/atime_openbsd.go new file mode 100644 index 000000000..3188a0738 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_openbsd.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_openbsd.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_plan9.go b/vendor/gopkg.in/djherbis/atime.v1/atime_plan9.go new file mode 100644 index 000000000..1b3bb972a --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_plan9.go @@ -0,0 +1,16 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_plan9.go + +package atime + +import ( + "os" + "time" +) + +func atime(fi os.FileInfo) time.Time { + return time.Unix(int64(fi.Sys().(*syscall.Dir).Atime), 0) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_solaris.go b/vendor/gopkg.in/djherbis/atime.v1/atime_solaris.go new file mode 100644 index 000000000..28175a7dd --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_solaris.go @@ -0,0 +1,21 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_solaris.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/atime_windows.go b/vendor/gopkg.in/djherbis/atime.v1/atime_windows.go new file mode 100644 index 000000000..8a15146fd --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/atime_windows.go @@ -0,0 +1,17 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// http://golang.org/src/os/stat_windows.go + +package atime + +import ( + "os" + "syscall" + "time" +) + +func atime(fi os.FileInfo) time.Time { + return time.Unix(0, fi.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()) +} diff --git a/vendor/gopkg.in/djherbis/atime.v1/stat.go b/vendor/gopkg.in/djherbis/atime.v1/stat.go new file mode 100644 index 000000000..eb658e144 --- /dev/null +++ b/vendor/gopkg.in/djherbis/atime.v1/stat.go @@ -0,0 +1,21 @@ +// Package atime provides a platform-independent way to get atimes for files. +package atime + +import ( + "os" + "time" +) + +// Get returns the Last Access Time for the given FileInfo +func Get(fi os.FileInfo) time.Time { + return atime(fi) +} + +// Stat returns the Last Access Time for the given filename +func Stat(name string) (time.Time, error) { + fi, err := os.Stat(name) + if err != nil { + return time.Time{}, err + } + return atime(fi), nil +} diff --git a/vendor/gopkg.in/djherbis/stream.v1/LICENSE b/vendor/gopkg.in/djherbis/stream.v1/LICENSE new file mode 100644 index 000000000..1e7b7cc09 --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Dustin H + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/gopkg.in/djherbis/stream.v1/README.md b/vendor/gopkg.in/djherbis/stream.v1/README.md new file mode 100644 index 000000000..d6034d8e4 --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/README.md @@ -0,0 +1,80 @@ +stream +========== + +[![GoDoc](https://godoc.org/github.com/djherbis/stream?status.svg)](https://godoc.org/github.com/djherbis/stream) +[![Release](https://img.shields.io/github/release/djherbis/stream.svg)](https://github.com/djherbis/stream/releases/latest) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt) +[![Build Status](https://travis-ci.org/djherbis/stream.svg?branch=master)](https://travis-ci.org/djherbis/stream) +[![Coverage Status](https://coveralls.io/repos/djherbis/stream/badge.svg?branch=master)](https://coveralls.io/r/djherbis/stream?branch=master) + +Usage +------------ + +Write and Read concurrently, and independently. + +To explain further, if you need to write to multiple places you can use io.MultiWriter, +if you need multiple Readers on something you can use io.TeeReader. If you want concurrency you can use io.Pipe(). + +However all of these methods "tie" each Read/Write together, your readers can't read from different places in the stream, each write must be distributed to all readers in sequence. + +This package provides a way for multiple Readers to read off the same Writer, without waiting for the others. This is done by writing to a "File" interface which buffers the input so it can be read at any time from many independent readers. Readers can even be created while writing or after the stream is closed. They will all see a consistent view of the stream and will block until the section of the stream they request is written, all while being unaffected by the actions of the other readers. + +The use case for this stems from my other project djherbis/fscache. I needed a byte caching mechanism which allowed many independent clients to have access to the data while it was being written, rather than re-generating the byte stream for each of them or waiting for a complete copy of the stream which could be stored and then re-used. + +```go +import( + "io" + "log" + "os" + "time" + + "github.com/djherbis/stream" +) + +func main(){ + w, err := stream.New("mystream") + if err != nil { + log.Fatal(err) + } + + go func(){ + io.WriteString(w, "Hello World!") + <-time.After(time.Second) + io.WriteString(w, "Streaming updates...") + w.Close() + }() + + waitForReader := make(chan struct{}) + go func(){ + // Read from the stream + r, err := w.NextReader() + if err != nil { + log.Fatal(err) + } + io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates... + r.Close() + close(waitForReader) + }() + + // Full copy of the stream! + r, err := w.NextReader() + if err != nil { + log.Fatal(err) + } + io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates... + + // r supports io.ReaderAt too. + p := make([]byte, 4) + r.ReadAt(p, 1) // Read "ello" into p + + r.Close() + + <-waitForReader // don't leave main before go-routine finishes +} +``` + +Installation +------------ +```sh +go get github.com/djherbis/stream +``` diff --git a/vendor/gopkg.in/djherbis/stream.v1/fs.go b/vendor/gopkg.in/djherbis/stream.v1/fs.go new file mode 100644 index 000000000..fe808bf0d --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/fs.go @@ -0,0 +1,39 @@ +package stream + +import ( + "io" + "os" +) + +// File is a backing data-source for a Stream. +type File interface { + Name() string // The name used to Create/Open the File + io.Reader // Reader must continue reading after EOF on subsequent calls after more Writes. + io.ReaderAt // Similarly to Reader + io.Writer // Concurrent reading/writing must be supported. + io.Closer // Close should do any cleanup when done with the File. +} + +// FileSystem is used to manage Files +type FileSystem interface { + Create(name string) (File, error) // Create must return a new File for Writing + Open(name string) (File, error) // Open must return an existing File for Reading + Remove(name string) error // Remove deletes an existing File +} + +// StdFileSystem is backed by the os package. +var StdFileSystem FileSystem = stdFS{} + +type stdFS struct{} + +func (fs stdFS) Create(name string) (File, error) { + return os.Create(name) +} + +func (fs stdFS) Open(name string) (File, error) { + return os.Open(name) +} + +func (fs stdFS) Remove(name string) error { + return os.Remove(name) +} diff --git a/vendor/gopkg.in/djherbis/stream.v1/memfs.go b/vendor/gopkg.in/djherbis/stream.v1/memfs.go new file mode 100644 index 000000000..b4432ae3a --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/memfs.go @@ -0,0 +1,107 @@ +package stream + +import ( + "bytes" + "errors" + "io" + "sync" +) + +// ErrNotFoundInMem is returned when an in-memory FileSystem cannot find a file. +var ErrNotFoundInMem = errors.New("not found") + +type memfs struct { + mu sync.RWMutex + files map[string]*memFile +} + +// NewMemFS returns a New in-memory FileSystem +func NewMemFS() FileSystem { + return &memfs{ + files: make(map[string]*memFile), + } +} + +func (fs *memfs) Create(key string) (File, error) { + fs.mu.Lock() + defer fs.mu.Unlock() + + file := &memFile{ + name: key, + r: bytes.NewBuffer(nil), + } + file.memReader.memFile = file + fs.files[key] = file + return file, nil +} + +func (fs *memfs) Open(key string) (File, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + if f, ok := fs.files[key]; ok { + return &memReader{memFile: f}, nil + } + return nil, ErrNotFoundInMem +} + +func (fs *memfs) Remove(key string) error { + fs.mu.Lock() + defer fs.mu.Unlock() + delete(fs.files, key) + return nil +} + +type memFile struct { + mu sync.RWMutex + name string + r *bytes.Buffer + memReader +} + +func (f *memFile) Name() string { + return f.name +} + +func (f *memFile) Write(p []byte) (int, error) { + if len(p) > 0 { + f.mu.Lock() + defer f.mu.Unlock() + return f.r.Write(p) + } + return len(p), nil +} + +func (f *memFile) Bytes() []byte { + f.mu.RLock() + defer f.mu.RUnlock() + return f.r.Bytes() +} + +func (f *memFile) Close() error { + return nil +} + +type memReader struct { + *memFile + n int +} + +func (r *memReader) ReadAt(p []byte, off int64) (n int, err error) { + data := r.Bytes() + if int64(len(data)) < off { + return 0, io.EOF + } + n, err = bytes.NewReader(data[off:]).ReadAt(p, 0) + return n, err +} + +func (r *memReader) Read(p []byte) (n int, err error) { + n, err = bytes.NewReader(r.Bytes()[r.n:]).Read(p) + r.n += n + return n, err +} + +func (r *memReader) Close() error { + return nil +} diff --git a/vendor/gopkg.in/djherbis/stream.v1/reader.go b/vendor/gopkg.in/djherbis/stream.v1/reader.go new file mode 100644 index 000000000..83212708b --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/reader.go @@ -0,0 +1,82 @@ +package stream + +import "io" + +// Reader is a concurrent-safe Stream Reader. +type Reader struct { + s *Stream + file File +} + +// Name returns the name of the underlying File in the FileSystem. +func (r *Reader) Name() string { return r.file.Name() } + +// ReadAt lets you Read from specific offsets in the Stream. +// ReadAt blocks while waiting for the requested section of the Stream to be written, +// unless the Stream is closed in which case it will always return immediately. +func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) { + r.s.b.RLock() + defer r.s.b.RUnlock() + + var m int + + for { + + m, err = r.file.ReadAt(p[n:], off+int64(n)) + n += m + + if r.s.b.IsOpen() { + + switch { + case n != 0 && err == nil: + return n, err + case err == io.EOF: + r.s.b.Wait() + case err != nil: + return n, err + } + + } else { + return n, err + } + + } +} + +// Read reads from the Stream. If the end of an open Stream is reached, Read +// blocks until more data is written or the Stream is Closed. +func (r *Reader) Read(p []byte) (n int, err error) { + r.s.b.RLock() + defer r.s.b.RUnlock() + + var m int + + for { + + m, err = r.file.Read(p[n:]) + n += m + + if r.s.b.IsOpen() { + + switch { + case n != 0 && err == nil: + return n, err + case err == io.EOF: + r.s.b.Wait() + case err != nil: + return n, err + } + + } else { + return n, err + } + + } +} + +// Close closes this Reader on the Stream. This must be called when done with the +// Reader or else the Stream cannot be Removed. +func (r *Reader) Close() error { + defer r.s.dec() + return r.file.Close() +} diff --git a/vendor/gopkg.in/djherbis/stream.v1/stream.go b/vendor/gopkg.in/djherbis/stream.v1/stream.go new file mode 100644 index 000000000..a0b3e1a79 --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/stream.go @@ -0,0 +1,92 @@ +// Package stream provides a way to read and write to a synchronous buffered pipe, with multiple reader support. +package stream + +import ( + "errors" + "sync" +) + +// ErrRemoving is returned when requesting a Reader on a Stream which is being Removed. +var ErrRemoving = errors.New("cannot open a new reader while removing file") + +// Stream is used to concurrently Write and Read from a File. +type Stream struct { + grp sync.WaitGroup + b *broadcaster + file File + fs FileSystem + removing chan struct{} +} + +// New creates a new Stream from the StdFileSystem with Name "name". +func New(name string) (*Stream, error) { + return NewStream(name, StdFileSystem) +} + +// NewStream creates a new Stream with Name "name" in FileSystem fs. +func NewStream(name string, fs FileSystem) (*Stream, error) { + f, err := fs.Create(name) + sf := &Stream{ + file: f, + fs: fs, + b: newBroadcaster(), + removing: make(chan struct{}), + } + sf.inc() + return sf, err +} + +// Name returns the name of the underlying File in the FileSystem. +func (s *Stream) Name() string { return s.file.Name() } + +// Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods. +func (s *Stream) Write(p []byte) (int, error) { + defer s.b.Broadcast() + s.b.Lock() + defer s.b.Unlock() + return s.file.Write(p) +} + +// Close will close the active stream. This will cause Readers to return EOF once they have +// read the entire stream. +func (s *Stream) Close() error { + defer s.dec() + defer s.b.Close() + s.b.Lock() + defer s.b.Unlock() + return s.file.Close() +} + +// Remove will block until the Stream and all its Readers have been Closed, +// at which point it will delete the underlying file. NextReader() will return +// ErrRemoving if called after Remove. +func (s *Stream) Remove() error { + close(s.removing) + s.grp.Wait() + return s.fs.Remove(s.file.Name()) +} + +// NextReader will return a concurrent-safe Reader for this stream. Each Reader will +// see a complete and independent view of the stream, and can Read will the stream +// is written to. +func (s *Stream) NextReader() (*Reader, error) { + s.inc() + + select { + case <-s.removing: + s.dec() + return nil, ErrRemoving + default: + } + + file, err := s.fs.Open(s.file.Name()) + if err != nil { + s.dec() + return nil, err + } + + return &Reader{file: file, s: s}, nil +} + +func (s *Stream) inc() { s.grp.Add(1) } +func (s *Stream) dec() { s.grp.Done() } diff --git a/vendor/gopkg.in/djherbis/stream.v1/sync.go b/vendor/gopkg.in/djherbis/stream.v1/sync.go new file mode 100644 index 000000000..26096ed9f --- /dev/null +++ b/vendor/gopkg.in/djherbis/stream.v1/sync.go @@ -0,0 +1,34 @@ +package stream + +import ( + "sync" + "sync/atomic" +) + +type broadcaster struct { + sync.RWMutex + closed uint32 + *sync.Cond +} + +func newBroadcaster() *broadcaster { + var b broadcaster + b.Cond = sync.NewCond(b.RWMutex.RLocker()) + return &b +} + +func (b *broadcaster) Wait() { + if b.IsOpen() { + b.Cond.Wait() + } +} + +func (b *broadcaster) IsOpen() bool { + return atomic.LoadUint32(&b.closed) == 0 +} + +func (b *broadcaster) Close() error { + atomic.StoreUint32(&b.closed, 1) + b.Cond.Broadcast() + return nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index de9f68474..e3d73904a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -23,6 +23,11 @@ "revision": "c1da56349675b292d3200463e2c88b9aa5e02391", "revisionTime": "2015-09-04T14:24:56-07:00" }, + { + "path": "github.com/djherbis/fscache", + "revision": "ffc728270b01f3906c396bbe796232b87750f24e", + "revisionTime": "2016-03-05T10:30:05-08:00" + }, { "path": "github.com/docker/docker/pkg/stdcopy", "revision": "9356c76d9f6e285e71f04df33ef7870455a42775", @@ -216,6 +221,16 @@ "revision": "8a57ed94ffd43444c0879fe75701732a38afc985", "revisionTime": "2015-12-29T21:02:54-07:00" }, + { + "path": "gopkg.in/djherbis/atime.v1", + "revision": "8e47e0e01d08df8b9f840d74299c8ab70a024a30", + "revisionTime": "2015-08-29T00:19:25-07:00" + }, + { + "path": "gopkg.in/djherbis/stream.v1", + "revision": "26a761059928627ca84837000dfb33447c66a146", + "revisionTime": "2016-02-03T22:24:40-08:00" + }, { "path": "gopkg.in/go-playground/validator.v8", "revision": "014792cf3e266caff1e916876be12282b33059e0",