Update codeberg.org/gruf libraries and fix go-store issue (#347)

* update codeberg.org/gruf/ libraries

Signed-off-by: kim <grufwub@gmail.com>

* another update

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2021-12-20 09:35:32 +00:00 committed by GitHub
parent 86e8e7fd21
commit 635ad2a42f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 366 additions and 197 deletions

8
go.mod
View file

@ -3,7 +3,7 @@ module github.com/superseriousbusiness/gotosocial
go 1.17 go 1.17
require ( require (
codeberg.org/gruf/go-store v1.1.2 codeberg.org/gruf/go-store v1.1.5
github.com/ReneKroon/ttlcache v1.7.0 github.com/ReneKroon/ttlcache v1.7.0
github.com/buckket/go-blurhash v1.1.0 github.com/buckket/go-blurhash v1.1.0
github.com/coreos/go-oidc/v3 v3.1.0 github.com/coreos/go-oidc/v3 v3.1.0
@ -45,12 +45,12 @@ require (
require ( require (
codeberg.org/gruf/go-bytes v1.0.2 // indirect codeberg.org/gruf/go-bytes v1.0.2 // indirect
codeberg.org/gruf/go-errors v1.0.3 // indirect codeberg.org/gruf/go-errors v1.0.4 // indirect
codeberg.org/gruf/go-fastpath v1.0.2 // indirect codeberg.org/gruf/go-fastpath v1.0.2 // indirect
codeberg.org/gruf/go-hashenc v1.0.1 // indirect codeberg.org/gruf/go-hashenc v1.0.1 // indirect
codeberg.org/gruf/go-logger v1.3.1 // indirect codeberg.org/gruf/go-logger v1.3.2 // indirect
codeberg.org/gruf/go-mutexes v1.0.1 // indirect codeberg.org/gruf/go-mutexes v1.0.1 // indirect
codeberg.org/gruf/go-nowish v1.0.2 // indirect codeberg.org/gruf/go-nowish v1.1.0 // indirect
codeberg.org/gruf/go-pools v1.0.2 // indirect codeberg.org/gruf/go-pools v1.0.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect github.com/aymerick/douceur v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect

14
go.sum
View file

@ -51,25 +51,27 @@ codeberg.org/gruf/go-bytes v1.0.1/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9
codeberg.org/gruf/go-bytes v1.0.2 h1:malqE42Ni+h1nnYWBUAJaDDtEzF4aeN4uPN8DfMNNvo= codeberg.org/gruf/go-bytes v1.0.2 h1:malqE42Ni+h1nnYWBUAJaDDtEzF4aeN4uPN8DfMNNvo=
codeberg.org/gruf/go-bytes v1.0.2/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9Ekx39cg= codeberg.org/gruf/go-bytes v1.0.2/go.mod h1:1v/ibfaosfXSZtRdW2rWaVrDXMc9E3bsi/M9Ekx39cg=
codeberg.org/gruf/go-cache v1.1.2/go.mod h1:/Dbc+xU72Op3hMn6x2PXF3NE9uIDFeS+sXPF00hN/7o= codeberg.org/gruf/go-cache v1.1.2/go.mod h1:/Dbc+xU72Op3hMn6x2PXF3NE9uIDFeS+sXPF00hN/7o=
codeberg.org/gruf/go-errors v1.0.3 h1:R0Scg9hStLejjN7+x7IWKn5I3nOI+y7J0Oc2gBcUpDY= codeberg.org/gruf/go-errors v1.0.4 h1:jOJCn/GMb6ELLRVlnmpimGRC2CbTreH5/CBZNWh9GZA=
codeberg.org/gruf/go-errors v1.0.3/go.mod h1:rJ08LdIE79Jg8vZ2TGylz/I+tZ1UuMJkGK5mNambIfQ= codeberg.org/gruf/go-errors v1.0.4/go.mod h1:rJ08LdIE79Jg8vZ2TGylz/I+tZ1UuMJkGK5mNambIfQ=
codeberg.org/gruf/go-fastpath v1.0.1/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI= codeberg.org/gruf/go-fastpath v1.0.1/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI=
codeberg.org/gruf/go-fastpath v1.0.2 h1:O3nuYPMXnN89dsgAwVFU5iCGINtPJdITWmbRe2an/iQ= codeberg.org/gruf/go-fastpath v1.0.2 h1:O3nuYPMXnN89dsgAwVFU5iCGINtPJdITWmbRe2an/iQ=
codeberg.org/gruf/go-fastpath v1.0.2/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI= codeberg.org/gruf/go-fastpath v1.0.2/go.mod h1:edveE/Kp3Eqi0JJm0lXYdkVrB28cNUkcb/bRGFTPqeI=
codeberg.org/gruf/go-hashenc v1.0.1 h1:EBvNe2wW8IPMUqT1XihB6/IM6KMJDLMFBxIUvmsy1f8= codeberg.org/gruf/go-hashenc v1.0.1 h1:EBvNe2wW8IPMUqT1XihB6/IM6KMJDLMFBxIUvmsy1f8=
codeberg.org/gruf/go-hashenc v1.0.1/go.mod h1:IfHhPCVScOiYmJLqdCQT9bYVS1nxNTV4ewMUvFWDPtc= codeberg.org/gruf/go-hashenc v1.0.1/go.mod h1:IfHhPCVScOiYmJLqdCQT9bYVS1nxNTV4ewMUvFWDPtc=
codeberg.org/gruf/go-logger v1.3.1 h1:1f10GQAkVbd3gNdpfSNHOVfaTFLLS8ebuA7IRXd8n90=
codeberg.org/gruf/go-logger v1.3.1/go.mod h1:tBduUc+Yb9vqGRxY9/FB0ZlYznSteLy/KmIANo7zFjA= codeberg.org/gruf/go-logger v1.3.1/go.mod h1:tBduUc+Yb9vqGRxY9/FB0ZlYznSteLy/KmIANo7zFjA=
codeberg.org/gruf/go-logger v1.3.2 h1:/2Cg8Tmu6H10lljq/BvHE+76O2d4tDNUDwitN6YUxxk=
codeberg.org/gruf/go-logger v1.3.2/go.mod h1:q4xmTSdaxPzfndSXVF1X2xcyCVk7Nd/PIWCDs/4biMg=
codeberg.org/gruf/go-mutexes v1.0.1 h1:X9bZW74YSEplWWdCrVXAvue5ztw3w5hh+INdXTENu88= codeberg.org/gruf/go-mutexes v1.0.1 h1:X9bZW74YSEplWWdCrVXAvue5ztw3w5hh+INdXTENu88=
codeberg.org/gruf/go-mutexes v1.0.1/go.mod h1:y2hbGLkWVHhNyxBOIVsA3/y2QMm6RSrYsC3sLVZ4EXM= codeberg.org/gruf/go-mutexes v1.0.1/go.mod h1:y2hbGLkWVHhNyxBOIVsA3/y2QMm6RSrYsC3sLVZ4EXM=
codeberg.org/gruf/go-nowish v1.0.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s= codeberg.org/gruf/go-nowish v1.0.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s=
codeberg.org/gruf/go-nowish v1.0.2 h1:/y8g38x44sD8JeqBPCkzqLoe0pReR1CTF8p6jXCOG1s=
codeberg.org/gruf/go-nowish v1.0.2/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s= codeberg.org/gruf/go-nowish v1.0.2/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s=
codeberg.org/gruf/go-nowish v1.1.0 h1:rj1z0AXDhLvnxs/DazWFxYAugs6rv5vhgWJkRCgrESg=
codeberg.org/gruf/go-nowish v1.1.0/go.mod h1:70nvICNcqQ9OHpF07N614Dyk7cpL5ToWU1K1ZVCec2s=
codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos= codeberg.org/gruf/go-pools v1.0.2 h1:B0X6yoCL9FVmnvyoizb1SYRwMYPWwEJBjPnBMM5ILos=
codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4r4TAyok= codeberg.org/gruf/go-pools v1.0.2/go.mod h1:MjUV3H6IASyBeBPCyCr7wjPpSNu8E2N87LG4r4TAyok=
codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw= codeberg.org/gruf/go-runners v1.1.1/go.mod h1:9gTrmMnO3d+50C+hVzcmGBf+zTuswReS278E2EMvnmw=
codeberg.org/gruf/go-store v1.1.2 h1:yf7osOqSOlJ9WNsFdp8e6IZCfnoT6VoI66d5SuP4Nsg= codeberg.org/gruf/go-store v1.1.5 h1:fp28vzGD15OsAF51CCwi7woH+Y3vb0aMl4OFh9JSjA0=
codeberg.org/gruf/go-store v1.1.2/go.mod h1:1CVRMdBbR0drn5pwz01aDT1Yls9W66u7E5kBiP9F9jw= codeberg.org/gruf/go-store v1.1.5/go.mod h1:Q6ev500ddKghDQ8KS4IstL/W9fptDKa2T9oeHP+tXsI=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=

View file

@ -12,7 +12,7 @@ import (
var logfmt = logger.TextFormat{ var logfmt = logger.TextFormat{
Strict: false, Strict: false,
Verbose: true, Verbose: true,
MaxDepth: 10, MaxDepth: 5,
} }
// KV is a structure for setting key-value pairs in ErrorData. // KV is a structure for setting key-value pairs in ErrorData.

View file

@ -6,4 +6,8 @@ Supports logging in 2 modes:
Running without locks isn't likely to cause you any issues*, but if it does, you can wrap your `io.Writer` using `AddSafety()` when instantiating your new Logger. Even when running the benchmarks, this library has no printing issues without locks, so in most cases you'll be fine, but the safety is there if you need it. Running without locks isn't likely to cause you any issues*, but if it does, you can wrap your `io.Writer` using `AddSafety()` when instantiating your new Logger. Even when running the benchmarks, this library has no printing issues without locks, so in most cases you'll be fine, but the safety is there if you need it.
*most logging libraries advertising high speeds are likely not performing mutex locks, which is why with this library you have the option to opt-in/out of them. *most logging libraries advertising high speeds are likely not performing mutex locks, which is why with this library you have the option to opt-in/out of them.
Note there are 2 uses of the unsafe package:
- safer interface nil value checks, uses similar logic to reflect package to check if the value in the internal fat pointer is nil
- casting a byte slice to string to allow sharing of similar byte and string methods, performs same logic as `strings.Builder{}.String()`

View file

@ -15,7 +15,7 @@ var (
// startClock starts the global nowish clock. // startClock starts the global nowish clock.
func startClock() { func startClock() {
clockOnce.Do(func() { clockOnce.Do(func() {
clock.Start(time.Millisecond * 10) clock.Start(time.Millisecond * 100)
clock.SetFormat("2006-01-02 15:04:05") clock.SetFormat("2006-01-02 15:04:05")
}) })
} }

View file

@ -45,7 +45,8 @@ func (f TextFormat) fmt(buf *bytes.Buffer) format {
} }
return format{ return format{
flags: flags, flags: flags,
depth: uint16(f.MaxDepth) << 8, curd: 0,
maxd: f.MaxDepth,
buf: buf, buf: buf,
} }
} }
@ -179,7 +180,9 @@ func (f TextFormat) AppendMsgf(buf *bytes.Buffer, s string, a ...interface{}) {
// format is the object passed among the append___ formatting functions // format is the object passed among the append___ formatting functions
type format struct { type format struct {
flags uint8 // 'isKey' and 'verbose' flags flags uint8 // 'isKey' and 'verbose' flags
depth uint16 // encoded as 0b(maxDepth)(curDepth) drefs uint8 // current value deref count
curd uint8 // current depth
maxd uint8 // maximum depth
buf *bytes.Buffer // out buffer buf *bytes.Buffer // out buffer
} }
@ -191,7 +194,12 @@ const (
// AtMaxDepth returns whether format is currently at max depth. // AtMaxDepth returns whether format is currently at max depth.
func (f format) AtMaxDepth() bool { func (f format) AtMaxDepth() bool {
return uint8(f.depth) >= uint8(f.depth>>8) return f.curd >= f.maxd
}
// Derefs returns no. times current value has been dereferenced.
func (f format) Derefs() uint8 {
return f.drefs
} }
// IsKey returns whether the isKey flag is set. // IsKey returns whether the isKey flag is set.
@ -214,7 +222,9 @@ func (f format) SetIsKey(is bool) format {
} }
return format{ return format{
flags: flags, flags: flags,
depth: f.depth, drefs: f.drefs,
curd: f.curd,
maxd: f.maxd,
buf: f.buf, buf: f.buf,
} }
} }
@ -223,16 +233,37 @@ func (f format) SetIsKey(is bool) format {
func (f format) IncrDepth() format { func (f format) IncrDepth() format {
return format{ return format{
flags: f.flags, flags: f.flags,
depth: (f.depth & 0b1111111100000000) | uint16(uint8(f.depth)+1), drefs: f.drefs,
curd: f.curd + 1,
maxd: f.maxd,
buf: f.buf, buf: f.buf,
} }
} }
// IncrDerefs returns format instance with dereference count incremented.
func (f format) IncrDerefs() format {
return format{
flags: f.flags,
drefs: f.drefs + 1,
curd: f.curd,
maxd: f.maxd,
buf: f.buf,
}
}
// appendType appends a type using supplied type str.
func appendType(fmt format, t string) {
for i := uint8(0); i < fmt.Derefs(); i++ {
fmt.buf.WriteByte('*')
}
fmt.buf.WriteString(t)
}
// appendNilType writes nil to buf, type included if verbose. // appendNilType writes nil to buf, type included if verbose.
func appendNilType(fmt format, t string) { func appendNilType(fmt format, t string) {
if fmt.Verbose() { if fmt.Verbose() {
fmt.buf.WriteByte('(') fmt.buf.WriteByte('(')
fmt.buf.WriteString(t) appendType(fmt, t)
fmt.buf.WriteString(`)(nil)`) fmt.buf.WriteString(`)(nil)`)
} else { } else {
fmt.buf.WriteString(`nil`) fmt.buf.WriteString(`nil`)
@ -243,7 +274,7 @@ func appendNilType(fmt format, t string) {
func appendNilIface(fmt format, i interface{}) { func appendNilIface(fmt format, i interface{}) {
if fmt.Verbose() { if fmt.Verbose() {
fmt.buf.WriteByte('(') fmt.buf.WriteByte('(')
fmt.buf.WriteString(reflect.TypeOf(i).String()) appendType(fmt, reflect.TypeOf(i).String())
fmt.buf.WriteString(`)(nil)`) fmt.buf.WriteString(`)(nil)`)
} else { } else {
fmt.buf.WriteString(`nil`) fmt.buf.WriteString(`nil`)
@ -254,7 +285,7 @@ func appendNilIface(fmt format, i interface{}) {
func appendNilRValue(fmt format, v reflect.Value) { func appendNilRValue(fmt format, v reflect.Value) {
if fmt.Verbose() { if fmt.Verbose() {
fmt.buf.WriteByte('(') fmt.buf.WriteByte('(')
fmt.buf.WriteString(v.Type().String()) appendType(fmt, v.Type().String())
fmt.buf.WriteString(`)(nil)`) fmt.buf.WriteString(`)(nil)`)
} else { } else {
fmt.buf.WriteString(`nil`) fmt.buf.WriteString(`nil`)
@ -272,7 +303,8 @@ func appendBytes(fmt format, b []byte) {
// Values CAN be nil formatted // Values CAN be nil formatted
appendNilType(fmt, `[]byte`) appendNilType(fmt, `[]byte`)
} else { } else {
appendString(fmt, bytes.BytesToString(b)) // unsafe cast as string to prevent reallocation
appendString(fmt, *(*string)(unsafe.Pointer(&b)))
} }
} }
@ -707,7 +739,7 @@ func appendRValue(fmt format, v reflect.Value) {
if v.IsNil() { if v.IsNil() {
appendNilRValue(fmt, v) appendNilRValue(fmt, v)
} else { } else {
appendRValue(fmt, v.Elem()) appendRValue(fmt.IncrDerefs(), v.Elem())
} }
case reflect.UnsafePointer: case reflect.UnsafePointer:
fmt.buf.WriteString("(unsafe.Pointer)") fmt.buf.WriteString("(unsafe.Pointer)")

View file

@ -16,8 +16,6 @@ func Start(precision time.Duration) (*Clock, func()) {
} }
type Clock struct { type Clock struct {
noCopy noCopy //nolint noCopy because a copy will fuck with atomics
// format stores the time formatting style string // format stores the time formatting style string
format string format string
@ -35,11 +33,13 @@ type Clock struct {
now unsafe.Pointer now unsafe.Pointer
} }
// Start starts the clock with the provided precision, the // Start starts the clock with the provided precision, the returned
// returned function is the stop function for the underlying timer // function is the stop function for the underlying timer. For >= 2ms,
// actual precision is usually within AT LEAST 10% of requested precision,
// less than this and the actual precision very quickly deteriorates.
func (c *Clock) Start(precision time.Duration) func() { func (c *Clock) Start(precision time.Duration) func() {
// Create ticker from duration // Create ticker from duration
tick := time.NewTicker(precision) tick := time.NewTicker(precision / 10)
// Set initial time // Set initial time
t := time.Now() t := time.Now()
@ -63,7 +63,7 @@ func (c *Clock) Start(precision time.Duration) func() {
return tick.Stop return tick.Stop
} }
// run is the internal clock ticking loop // run is the internal clock ticking loop.
func (c *Clock) run(tick *time.Ticker) { func (c *Clock) run(tick *time.Ticker) {
for { for {
// Wait on tick // Wait on tick
@ -83,12 +83,12 @@ func (c *Clock) run(tick *time.Ticker) {
} }
} }
// Now returns a good (ish) estimate of the current 'now' time // Now returns a good (ish) estimate of the current 'now' time.
func (c *Clock) Now() time.Time { func (c *Clock) Now() time.Time {
return *(*time.Time)(atomic.LoadPointer(&c.now)) return *(*time.Time)(atomic.LoadPointer(&c.now))
} }
// NowFormat returns the formatted "now" time, cached until next tick and "now" updates // NowFormat returns the formatted "now" time, cached until next tick and "now" updates.
func (c *Clock) NowFormat() string { func (c *Clock) NowFormat() string {
// If format still valid, return this // If format still valid, return this
if atomic.LoadUint32(&c.valid) == 1 { if atomic.LoadUint32(&c.valid) == 1 {
@ -105,27 +105,18 @@ func (c *Clock) NowFormat() string {
} }
// Calculate time format // Calculate time format
b := c.Now().AppendFormat( nowfmt := c.Now().Format(c.format)
make([]byte, 0, len(c.format)),
c.format,
)
// Update the stored value and set valid! // Update the stored value and set valid!
atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&b)) atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&nowfmt))
atomic.StoreUint32(&c.valid, 1) atomic.StoreUint32(&c.valid, 1)
// Unlock and return // Unlock and return
c.mutex.Unlock() c.mutex.Unlock()
return nowfmt
// Note:
// it's safe to do this conversion here
// because this byte slice will never change.
// and we have the direct pointer to it, we're
// not requesting it atomicly via c.Format
return *(*string)(unsafe.Pointer(&b))
} }
// SetFormat sets the time format string used by .NowFormat() // SetFormat sets the time format string used by .NowFormat().
func (c *Clock) SetFormat(format string) { func (c *Clock) SetFormat(format string) {
// Get mutex lock // Get mutex lock
c.mutex.Lock() c.mutex.Lock()

View file

@ -6,113 +6,228 @@ import (
"time" "time"
) )
// Timeout provides a reusable structure for enforcing timeouts with a cancel // Timeout provides a reusable structure for enforcing timeouts with a cancel.
type Timeout struct { type Timeout struct {
noCopy noCopy //nolint noCopy because a copy will mess with atomics timer *time.Timer // timer is the underlying timeout-timer
cncl syncer // cncl is the cancel synchronization channel
tk *time.Timer // tk is the underlying timeout-timer next int64 // next is the next timeout duration to run on
ch syncer // ch is the cancel synchronization channel state uint32 // state stores the current timeout state
wg sync.WaitGroup // wg is the waitgroup to hold .Start() until timeout goroutine started mu sync.Mutex // mu protects state, and helps synchronize return of .Start()
st timeoutState // st stores the current timeout state (and protects concurrent use)
} }
// NewTimeout returns a new Timeout instance // NewTimeout returns a new Timeout instance.
func NewTimeout() Timeout { func NewTimeout() Timeout {
tk := time.NewTimer(time.Minute) timer := time.NewTimer(time.Minute)
tk.Stop() // don't keep it running timer.Stop() // don't keep it running
return Timeout{ return Timeout{
tk: tk, timer: timer,
ch: make(syncer), cncl: make(syncer),
} }
} }
func (t *Timeout) runTimeout(hook func()) { // startTimeout is the main timeout routine, handling starting the
t.wg.Add(1) // timeout runner at first and upon any time extensions, and handling
go func() { // any received cancels by stopping the running timer.
cancelled := false func (t *Timeout) startTimeout(hook func()) {
var cancelled bool
// Signal started // Receive first timeout duration
t.wg.Done() d := atomic.SwapInt64(&t.next, 0)
select { // Indicate finished starting, this
// Timeout reached // was left locked by t.start().
case <-t.tk.C: t.mu.Unlock()
if !t.st.stop() /* a sneaky cancel! */ {
t.ch.recv() for {
// Run supplied timeout
cancelled = t.runTimeout(d)
if cancelled {
break
}
// Check for extension or set timed out
d = atomic.SwapInt64(&t.next, 0)
if d < 1 {
if t.timedOut() {
// timeout reached
hook()
break
} else {
// already cancelled
t.cncl.wait()
cancelled = true cancelled = true
defer t.ch.send() break
} }
}
// Cancel called if !t.extend() {
case <-t.ch: // already cancelled
t.cncl.wait()
cancelled = true
break
}
}
if cancelled {
// Release the .Cancel()
defer t.cncl.notify()
}
// Mark as done
t.reset()
}
// runTimeout will until supplied timeout or cancel called.
func (t *Timeout) runTimeout(d int64) (cancelled bool) {
// Start the timer for 'd'
t.timer.Reset(time.Duration(d))
select {
// Timeout reached
case <-t.timer.C:
if !t.timingOut() {
// a sneaky cancel!
t.cncl.wait()
cancelled = true cancelled = true
defer t.ch.send()
} }
// Ensure timer stopped // Cancel called
if cancelled && !t.tk.Stop() { case <-t.cncl.wait():
<-t.tk.C cancelled = true
if !t.timer.Stop() {
<-t.timer.C
} }
}
// Defer reset state return cancelled
defer t.st.reset()
// If timed out call hook
if !cancelled {
hook()
}
}()
t.wg.Wait()
} }
// Start starts the timer with supplied timeout. If timeout is reached before // Start starts the timer with supplied timeout. If timeout is reached before
// cancel then supplied timeout hook will be called. Error may be called if // cancel then supplied timeout hook will be called. Panic will be called if
// Timeout is already running when this function is called // Timeout is already running when calling this function.
func (t *Timeout) Start(d time.Duration, hook func()) { func (t *Timeout) Start(d time.Duration, hook func()) {
if !t.st.start() { if !t.start() {
panic("nowish: timeout already started") t.mu.Unlock() // need to unlock
panic("timeout already started")
} }
t.runTimeout(hook)
t.tk.Reset(d) // Start the timeout
atomic.StoreInt64(&t.next, int64(d))
go t.startTimeout(hook)
// Wait until start
t.mu.Lock()
t.mu.Unlock()
}
// Extend will attempt to extend the timeout runner's time, returns false if not running.
func (t *Timeout) Extend(d time.Duration) bool {
var ok bool
if ok = t.running(); ok {
atomic.AddInt64(&t.next, int64(d))
}
return ok
} }
// Cancel cancels the currently running timer. If a cancel is achieved, then // Cancel cancels the currently running timer. If a cancel is achieved, then
// this function will return after the timeout goroutine is finished // this function will return after the timeout goroutine is finished.
func (t *Timeout) Cancel() { func (t *Timeout) Cancel() {
if !t.st.stop() { if !t.cancel() {
return return
} }
t.ch.send() t.cncl.notify()
t.ch.recv() <-t.cncl.wait()
} }
// timeoutState provides a thread-safe timeout state mechanism // possible timeout states.
type timeoutState uint32 const (
stopped = 0
started = 1
timingOut = 2
cancelled = 3
timedOut = 4
)
// start attempts to start the state, must be already reset, returns success // cas will perform a compare and swap where the compare is a provided function.
func (t *timeoutState) start() bool { func (t *Timeout) cas(check func(uint32) bool, swap uint32) bool {
return atomic.CompareAndSwapUint32((*uint32)(t), 0, 1) var cas bool
t.mu.Lock()
if cas = check(t.state); cas {
t.state = swap
}
t.mu.Unlock()
return cas
} }
// stop attempts to stop the state, must already be started, returns success // start attempts to mark the timeout state as 'started', note DOES NOT unlock Timeout.mu.
func (t *timeoutState) stop() bool { func (t *Timeout) start() bool {
return atomic.CompareAndSwapUint32((*uint32)(t), 1, 2) var ok bool
t.mu.Lock()
if ok = (t.state == stopped); ok {
t.state = started
}
// don't unlock
return ok
} }
// reset is fairly self explanatory // timingOut attempts to mark the timeout state as 'timing out'.
func (t *timeoutState) reset() { func (t *Timeout) timingOut() bool {
atomic.StoreUint32((*uint32)(t), 0) return t.cas(func(u uint32) bool {
return (u == started)
}, timingOut)
} }
// syncer provides helpful receiver methods for a synchronization channel // timedOut attempts mark the 'timing out' state as 'timed out'.
func (t *Timeout) timedOut() bool {
return t.cas(func(u uint32) bool {
return (u == timingOut)
}, timedOut)
}
// extend attempts to extend a 'timing out' state by moving it back to 'started'.
func (t *Timeout) extend() bool {
return t.cas(func(u uint32) bool {
return (u == started) ||
(u == timingOut)
}, started)
}
// running returns whether the state is anything other than 'stopped'.
func (t *Timeout) running() bool {
t.mu.Lock()
running := (t.state != stopped)
t.mu.Unlock()
return running
}
// cancel attempts to mark the timeout state as 'cancelled'.
func (t *Timeout) cancel() bool {
return t.cas(func(u uint32) bool {
return (u == started) ||
(u == timingOut)
}, cancelled)
}
// reset marks the timeout state as 'stopped'.
func (t *Timeout) reset() {
t.mu.Lock()
t.state = stopped
t.mu.Unlock()
}
// syncer provides helpful receiver methods for a synchronization channel.
type syncer (chan struct{}) type syncer (chan struct{})
// send blocks on sending an empty value down channel // notify blocks on sending an empty value down channel.
func (s syncer) send() { func (s syncer) notify() {
s <- struct{}{} s <- struct{}{}
} }
// recv blocks on receiving (and dropping) empty value from channel // wait returns the underlying channel for blocking until '.notify()'.
func (s syncer) recv() { func (s syncer) wait() <-chan struct{} {
<-s return s
} }

View file

@ -2,6 +2,7 @@ package kv
import ( import (
"io" "io"
"sync"
"codeberg.org/gruf/go-errors" "codeberg.org/gruf/go-errors"
) )
@ -15,9 +16,14 @@ var ErrStateClosed = errors.New("store/kv: state closed")
// then the state has zero guarantees // then the state has zero guarantees
type StateRO struct { type StateRO struct {
store *KVStore store *KVStore
mutex sync.RWMutex
} }
func (st *StateRO) Get(key string) ([]byte, error) { func (st *StateRO) Get(key string) ([]byte, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return nil, ErrStateClosed return nil, ErrStateClosed
@ -28,6 +34,10 @@ func (st *StateRO) Get(key string) ([]byte, error) {
} }
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return nil, ErrStateClosed return nil, ErrStateClosed
@ -38,6 +48,10 @@ func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
} }
func (st *StateRO) Has(key string) (bool, error) { func (st *StateRO) Has(key string) (bool, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return false, ErrStateClosed return false, ErrStateClosed
@ -47,8 +61,16 @@ func (st *StateRO) Has(key string) (bool, error) {
return st.store.has(key) return st.store.has(key)
} }
func (st *StateRO) close() { func (st *StateRO) Release() {
st.store = nil // Get state write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Release the store
if st.store != nil {
st.store.mutex.RUnlock()
st.store = nil
}
} }
// StateRW provides a read-write window to the store. While this // StateRW provides a read-write window to the store. While this
@ -58,9 +80,14 @@ func (st *StateRO) close() {
// then the state has zero guarantees // then the state has zero guarantees
type StateRW struct { type StateRW struct {
store *KVStore store *KVStore
mutex sync.RWMutex
} }
func (st *StateRW) Get(key string) ([]byte, error) { func (st *StateRW) Get(key string) ([]byte, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return nil, ErrStateClosed return nil, ErrStateClosed
@ -71,6 +98,10 @@ func (st *StateRW) Get(key string) ([]byte, error) {
} }
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return nil, ErrStateClosed return nil, ErrStateClosed
@ -81,6 +112,10 @@ func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
} }
func (st *StateRW) Put(key string, value []byte) error { func (st *StateRW) Put(key string, value []byte) error {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return ErrStateClosed return ErrStateClosed
@ -91,6 +126,10 @@ func (st *StateRW) Put(key string, value []byte) error {
} }
func (st *StateRW) PutStream(key string, r io.Reader) error { func (st *StateRW) PutStream(key string, r io.Reader) error {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return ErrStateClosed return ErrStateClosed
@ -101,6 +140,10 @@ func (st *StateRW) PutStream(key string, r io.Reader) error {
} }
func (st *StateRW) Has(key string) (bool, error) { func (st *StateRW) Has(key string) (bool, error) {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return false, ErrStateClosed return false, ErrStateClosed
@ -111,6 +154,10 @@ func (st *StateRW) Has(key string) (bool, error) {
} }
func (st *StateRW) Delete(key string) error { func (st *StateRW) Delete(key string) error {
// Get state read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Check not closed // Check not closed
if st.store == nil { if st.store == nil {
return ErrStateClosed return ErrStateClosed
@ -120,6 +167,14 @@ func (st *StateRW) Delete(key string) error {
return st.store.delete(key) return st.store.delete(key)
} }
func (st *StateRW) close() { func (st *StateRW) Release() {
st.store = nil // Get state write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Release the store
if st.store != nil {
st.store.mutex.Unlock()
st.store = nil
}
} }

View file

@ -212,32 +212,34 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
}, nil }, nil
} }
// Read provides a read-only window to the store, holding it in a read-locked state until // Read provides a read-only window to the store, holding it in a read-locked state until release
// the supplied function returns func (st *KVStore) Read() *StateRO {
func (st *KVStore) Read(do func(*StateRO)) {
// Get store read lock
st.mutex.RLock() st.mutex.RLock()
defer st.mutex.RUnlock() return &StateRO{store: st}
// Create new store state (defer close)
state := &StateRO{store: st}
defer state.close()
// Pass state
do(state)
} }
// Update provides a read-write window to the store, holding it in a read-write-locked state // ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
// until the supplied functions returns func (st *KVStore) ReadFn(fn func(*StateRO)) {
func (st *KVStore) Update(do func(*StateRW)) { // Acquire read-only state
// Get store lock state := st.Read()
defer state.Release()
// Pass to fn
fn(state)
}
// Update provides a read-write window to the store, holding it in a write-locked state until release
func (st *KVStore) Update() *StateRW {
st.mutex.Lock() st.mutex.Lock()
defer st.mutex.Unlock() return &StateRW{store: st}
}
// Create new store state (defer close)
state := &StateRW{store: st} // UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
defer state.close() func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Acquire read-write state
// Pass state state := st.Update()
do(state) defer state.Release()
// Pass to fn
fn(state)
} }

View file

@ -585,8 +585,8 @@ func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
// nodePathForKey calculates the node file path for supplied key // nodePathForKey calculates the node file path for supplied key
func (st *BlockStorage) nodePathForKey(key string) (string, error) { func (st *BlockStorage) nodePathForKey(key string) (string, error) {
// Path separators are illegal // Path separators are illegal, as directory paths
if strings.Contains(key, "/") { if strings.Contains(key, "/") || key == "." || key == ".." {
return "", ErrInvalidKey return "", ErrInvalidKey
} }
@ -594,6 +594,10 @@ func (st *BlockStorage) nodePathForKey(key string) (string, error) {
pb := util.GetPathBuilder() pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb) defer util.PutPathBuilder(pb)
// Append the nodepath to key
pb.AppendString(st.nodePath)
pb.AppendString(key)
// Return joined + cleaned node-path // Return joined + cleaned node-path
return pb.Join(st.nodePath, key), nil return pb.Join(st.nodePath, key), nil
} }

View file

@ -69,7 +69,6 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {
// DiskStorage is a Storage implementation that stores directly to a filesystem // DiskStorage is a Storage implementation that stores directly to a filesystem
type DiskStorage struct { type DiskStorage struct {
path string // path is the root path of this store path string // path is the root path of this store
dots int // dots is the "dotdot" count for the root store path
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store config DiskConfig // cfg is the supplied configuration for this store
} }
@ -120,7 +119,6 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Return new DiskStorage // Return new DiskStorage
return &DiskStorage{ return &DiskStorage{
path: path, path: path,
dots: util.CountDotdots(path),
bufp: pools.NewBufferPool(config.WriteBufSize), bufp: pools.NewBufferPool(config.WriteBufSize),
config: config, config: config,
}, nil }, nil
@ -282,10 +280,10 @@ func (st *DiskStorage) filepath(key string) (string, error) {
pb.AppendString(st.path) pb.AppendString(st.path)
pb.AppendString(key) pb.AppendString(key)
// If path is dir traversal, and traverses FURTHER // Check for dir traversal outside of root
// than store root, this is an error if util.IsDirTraversal(st.path, pb.StringPtr()) {
if util.CountDotdots(pb.StringPtr()) > st.dots {
return "", ErrInvalidKey return "", ErrInvalidKey
} }
return pb.String(), nil return pb.String(), nil
} }

View file

@ -2,7 +2,6 @@ package storage
import ( import (
"io" "io"
"sync"
"codeberg.org/gruf/go-bytes" "codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-store/util" "codeberg.org/gruf/go-store/util"
@ -12,14 +11,12 @@ import (
// pairs in a Go map in-memory. The map is protected by a mutex. // pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct { type MemoryStorage struct {
fs map[string][]byte fs map[string][]byte
mu sync.Mutex
} }
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'. // OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
func OpenMemory(size int) *MemoryStorage { func OpenMemory(size int) *MemoryStorage {
return &MemoryStorage{ return &MemoryStorage{
fs: make(map[string][]byte, size), fs: make(map[string][]byte, size),
mu: sync.Mutex{},
} }
} }
@ -30,33 +27,19 @@ func (st *MemoryStorage) Clean() error {
// ReadBytes implements Storage.ReadBytes(). // ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
// Safely check store
st.mu.Lock()
b, ok := st.fs[key] b, ok := st.fs[key]
st.mu.Unlock()
// Return early if not exist
if !ok { if !ok {
return nil, ErrNotFound return nil, ErrNotFound
} }
// Create return copy
return bytes.Copy(b), nil return bytes.Copy(b), nil
} }
// ReadStream implements Storage.ReadStream(). // ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// Safely check store
st.mu.Lock()
b, ok := st.fs[key] b, ok := st.fs[key]
st.mu.Unlock()
// Return early if not exist
if !ok { if !ok {
return nil, ErrNotFound return nil, ErrNotFound
} }
// Create io.ReadCloser from 'b' copy
b = bytes.Copy(b) b = bytes.Copy(b)
r := bytes.NewReader(b) r := bytes.NewReader(b)
return util.NopReadCloser(r), nil return util.NopReadCloser(r), nil
@ -64,68 +47,43 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// WriteBytes implements Storage.WriteBytes(). // WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(key string, b []byte) error { func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
// Safely check store
st.mu.Lock()
_, ok := st.fs[key] _, ok := st.fs[key]
// Check for already exist
if ok { if ok {
st.mu.Unlock()
return ErrAlreadyExists return ErrAlreadyExists
} }
// Write + unlock
st.fs[key] = bytes.Copy(b) st.fs[key] = bytes.Copy(b)
st.mu.Unlock()
return nil return nil
} }
// WriteStream implements Storage.WriteStream(). // WriteStream implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(key string, r io.Reader) error { func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
// Read all from reader
b, err := io.ReadAll(r) b, err := io.ReadAll(r)
if err != nil { if err != nil {
return err return err
} }
// Write to storage
return st.WriteBytes(key, b) return st.WriteBytes(key, b)
} }
// Stat implements Storage.Stat(). // Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(key string) (bool, error) { func (st *MemoryStorage) Stat(key string) (bool, error) {
st.mu.Lock()
_, ok := st.fs[key] _, ok := st.fs[key]
st.mu.Unlock()
return ok, nil return ok, nil
} }
// Remove implements Storage.Remove(). // Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(key string) error { func (st *MemoryStorage) Remove(key string) error {
// Safely check store
st.mu.Lock()
_, ok := st.fs[key] _, ok := st.fs[key]
// Check in store
if !ok { if !ok {
st.mu.Unlock()
return ErrNotFound return ErrNotFound
} }
// Delete + unlock
delete(st.fs, key) delete(st.fs, key)
st.mu.Unlock()
return nil return nil
} }
// WalkKeys implements Storage.WalkKeys(). // WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error { func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
// Safely walk storage keys
st.mu.Lock()
for key := range st.fs { for key := range st.fs {
opts.WalkFn(entry(key)) opts.WalkFn(entry(key))
} }
st.mu.Unlock()
return nil return nil
} }

View file

@ -9,14 +9,22 @@ import (
"codeberg.org/gruf/go-fastpath" "codeberg.org/gruf/go-fastpath"
) )
var dotdot = "../" // IsDirTraversal will check if rootPlusPath is a dir traversal outside of root,
// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath)
func IsDirTraversal(root string, rootPlusPath string) bool {
switch {
// Root is $PWD, check for traversal out of
case root == ".":
return strings.HasPrefix(rootPlusPath, "../")
// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path // The path MUST be prefixed by root
func CountDotdots(path string) int { case !strings.HasPrefix(rootPlusPath, root):
if !strings.HasSuffix(path, dotdot) { return true
return 0
// In all other cases, check not equal
default:
return len(root) == len(rootPlusPath)
} }
return strings.Count(path, dotdot)
} }
// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry // WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry

8
vendor/modules.txt vendored
View file

@ -1,7 +1,7 @@
# codeberg.org/gruf/go-bytes v1.0.2 # codeberg.org/gruf/go-bytes v1.0.2
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-bytes codeberg.org/gruf/go-bytes
# codeberg.org/gruf/go-errors v1.0.3 # codeberg.org/gruf/go-errors v1.0.4
## explicit; go 1.15 ## explicit; go 1.15
codeberg.org/gruf/go-errors codeberg.org/gruf/go-errors
# codeberg.org/gruf/go-fastpath v1.0.2 # codeberg.org/gruf/go-fastpath v1.0.2
@ -10,19 +10,19 @@ codeberg.org/gruf/go-fastpath
# codeberg.org/gruf/go-hashenc v1.0.1 # codeberg.org/gruf/go-hashenc v1.0.1
## explicit; go 1.16 ## explicit; go 1.16
codeberg.org/gruf/go-hashenc codeberg.org/gruf/go-hashenc
# codeberg.org/gruf/go-logger v1.3.1 # codeberg.org/gruf/go-logger v1.3.2
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-logger codeberg.org/gruf/go-logger
# codeberg.org/gruf/go-mutexes v1.0.1 # codeberg.org/gruf/go-mutexes v1.0.1
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-mutexes codeberg.org/gruf/go-mutexes
# codeberg.org/gruf/go-nowish v1.0.2 # codeberg.org/gruf/go-nowish v1.1.0
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-nowish codeberg.org/gruf/go-nowish
# codeberg.org/gruf/go-pools v1.0.2 # codeberg.org/gruf/go-pools v1.0.2
## explicit; go 1.16 ## explicit; go 1.16
codeberg.org/gruf/go-pools codeberg.org/gruf/go-pools
# codeberg.org/gruf/go-store v1.1.2 # codeberg.org/gruf/go-store v1.1.5
## explicit; go 1.14 ## explicit; go 1.14
codeberg.org/gruf/go-store/kv codeberg.org/gruf/go-store/kv
codeberg.org/gruf/go-store/storage codeberg.org/gruf/go-store/storage