woodpecker/stream/stream_impl.go
2016-04-12 17:43:38 -07:00

96 lines
1.9 KiB
Go

package stream
import (
"io"
"sync"
"github.com/djherbis/fscache"
)
var noexp fscache.Reaper
// New creates a new Mux using an in-memory filesystem.
func New() Mux {
fs := fscache.NewMemFs()
c, err := fscache.NewCache(fs, noexp)
if err != nil {
panic(err)
}
return &mux{c}
}
// New creates a new Mux using a persistent filesystem.
func NewFileSystem(path string) Mux {
fs, err := fscache.NewFs(path, 0777)
if err != nil {
panic(err)
}
c, err := fscache.NewCache(fs, noexp)
if err != nil {
panic(err)
}
return &mux{c}
}
// mux wraps the default fscache.Cache to match the
// defined interface and to wrap the ReadCloser and
// WriteCloser to avoid panics when we over-aggressively
// close streams.
type mux struct {
cache fscache.Cache
}
func (m *mux) Create(key string) (io.ReadCloser, io.WriteCloser, error) {
rc, wc, err := m.cache.Get(key)
if rc != nil {
rc = &closeOnceReader{ReaderAt: rc, ReadCloser: rc}
}
if wc != nil {
wc = &closeOnceWriter{WriteCloser: wc}
}
return rc, wc, err
}
func (m *mux) Open(key string) (io.ReadCloser, io.WriteCloser, error) {
return m.Create(key)
}
func (m *mux) Exists(key string) bool {
return m.cache.Exists(key)
}
func (m *mux) Remove(key string) error {
return m.cache.Remove(key)
}
// closeOnceReader is a helper function that ensures
// the reader is only closed once. This is because
// attempting to close the fscache reader more than
// once results in a panic.
type closeOnceReader struct {
io.ReaderAt
io.ReadCloser
once sync.Once
}
func (c *closeOnceReader) Close() error {
c.once.Do(func() {
c.ReadCloser.Close()
})
return nil
}
// closeOnceWriter is a helper function that ensures
// the writer is only closed once. This is because
// attempting to close the fscache writer more than
// once results in a panic.
type closeOnceWriter struct {
io.WriteCloser
once sync.Once
}
func (c *closeOnceWriter) Close() error {
c.once.Do(func() {
c.WriteCloser.Close()
})
return nil
}