Merge pull request #281 from bakins/random-replacement

Add random replacement mapper cache
This commit is contained in:
Matthias Rampke 2020-03-05 10:25:15 +01:00 committed by GitHub
commit 60fbaf5e27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 277 additions and 62 deletions

13
main.go
View file

@ -101,7 +101,7 @@ func tcpAddrFromString(addr string) (*net.TCPAddr, error) {
}, nil }, nil
} }
func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger) { func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP) signal.Notify(signals, syscall.SIGHUP)
@ -112,7 +112,7 @@ func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int,
continue continue
} }
level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s) level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s)
err := mapper.InitFromFile(fileName, cacheSize) err := mapper.InitFromFile(fileName, cacheSize, option)
if err != nil { if err != nil {
level.Info(logger).Log("msg", "Error reloading config", "error", err) level.Info(logger).Log("msg", "Error reloading config", "error", err)
configLoads.WithLabelValues("failure").Inc() configLoads.WithLabelValues("failure").Inc()
@ -149,6 +149,7 @@ func main() {
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random")
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int() eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int()
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int() eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int()
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration() eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
@ -162,6 +163,8 @@ func main() {
kingpin.Parse() kingpin.Parse()
logger := promlog.New(promlogConfig) logger := promlog.New(promlogConfig)
cacheOption := mapper.WithCacheType(*cacheType)
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" { if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
level.Error(logger).Log("At least one of UDP/TCP/Unixgram listeners must be specified.") level.Error(logger).Log("At least one of UDP/TCP/Unixgram listeners must be specified.")
os.Exit(1) os.Exit(1)
@ -268,7 +271,7 @@ func main() {
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
if *mappingConfig != "" { if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize) err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil { if err != nil {
level.Error(logger).Log("msg", "error loading config", "error", err) level.Error(logger).Log("msg", "error loading config", "error", err)
os.Exit(1) os.Exit(1)
@ -283,10 +286,10 @@ func main() {
} }
} }
} else { } else {
mapper.InitCache(*cacheSize) mapper.InitCache(*cacheSize, cacheOption)
} }
go configReloader(*mappingConfig, mapper, *cacheSize, logger) go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
exporter := NewExporter(mapper, logger) exporter := NewExporter(mapper, logger)

View file

@ -18,12 +18,12 @@ import (
"io/ioutil" "io/ioutil"
"regexp" "regexp"
"sync" "sync"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm" "github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
"time"
) )
var ( var (
@ -98,7 +98,7 @@ var defaultQuantiles = []metricObjective{
{Quantile: 0.99, Error: 0.001}, {Quantile: 0.99, Error: 0.001},
} }
func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) error { func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, options ...CacheOption) error {
var n MetricMapper var n MetricMapper
if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil { if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil {
@ -248,7 +248,7 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) er
m.Defaults = n.Defaults m.Defaults = n.Defaults
m.Mappings = n.Mappings m.Mappings = n.Mappings
m.InitCache(cacheSize) m.InitCache(cacheSize, options...)
if n.doFSM { if n.doFSM {
var mappings []string var mappings []string
@ -270,20 +270,39 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) er
return nil return nil
} }
func (m *MetricMapper) InitFromFile(fileName string, cacheSize int) error { func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...CacheOption) error {
mappingStr, err := ioutil.ReadFile(fileName) mappingStr, err := ioutil.ReadFile(fileName)
if err != nil { if err != nil {
return err return err
} }
return m.InitFromYAMLString(string(mappingStr), cacheSize) return m.InitFromYAMLString(string(mappingStr), cacheSize, options...)
} }
func (m *MetricMapper) InitCache(cacheSize int) { func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 { if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache() m.cache = NewMetricMapperNoopCache()
} else { } else {
cache, err := NewMetricMapperCache(cacheSize) o := cacheOptions{
cacheType: "lru",
}
for _, f := range options {
f(&o)
}
var (
cache MetricMapperCache
err error
)
switch o.cacheType {
case "lru":
cache, err = NewMetricMapperCache(cacheSize)
case "random":
cache, err = NewMetricMapperRRCache(cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType)
}
if err != nil { if err != nil {
log.Fatalf("Unable to setup metric cache. Caused by: %s", err) log.Fatalf("Unable to setup metric cache. Caused by: %s", err)
} }

View file

@ -15,6 +15,7 @@ package mapper
import ( import (
"fmt" "fmt"
"math/rand"
"testing" "testing"
) )
@ -580,17 +581,21 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
"metric100.a", "metric100.a",
} }
mapper := MetricMapper{} for _, cacheType := range []string{"lru", "random"} {
err := mapper.InitFromYAMLString(config, 1000) b.Run(cacheType, func(b *testing.B) {
if err != nil { mapper := MetricMapper{}
b.Fatalf("Config load error: %s %s", config, err) err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
} if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer() b.ResetTimer()
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for _, metric := range mappings { for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter) mapper.GetMapping(metric, MetricTypeCounter)
} }
}
})
} }
} }
@ -626,17 +631,21 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex)
"metric5.a", "metric5.a",
} }
mapper := MetricMapper{} for _, cacheType := range []string{"lru", "random"} {
err := mapper.InitFromYAMLString(config, 1000) b.Run(cacheType, func(b *testing.B) {
if err != nil { mapper := MetricMapper{}
b.Fatalf("Config load error: %s %s", config, err) err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
} if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer() b.ResetTimer()
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for _, metric := range mappings { for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter) mapper.GetMapping(metric, MetricTypeCounter)
} }
}
})
} }
} }
@ -668,17 +677,21 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
"metric100.a", "metric100.a",
} }
mapper := MetricMapper{} for _, cacheType := range []string{"lru", "random"} {
err := mapper.InitFromYAMLString(config, 1000) b.Run(cacheType, func(b *testing.B) {
if err != nil { mapper := MetricMapper{}
b.Fatalf("Config load error: %s %s", config, err) err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
} if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer() b.ResetTimer()
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for _, metric := range mappings { for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter) mapper.GetMapping(metric, MetricTypeCounter)
} }
}
})
} }
} }
@ -800,17 +813,21 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob)
"metric50.a.b.c.d.e.f.g.h.i.j.k.l", "metric50.a.b.c.d.e.f.g.h.i.j.k.l",
} }
mapper := MetricMapper{} for _, cacheType := range []string{"lru", "random"} {
err := mapper.InitFromYAMLString(config, 1000) b.Run(cacheType, func(b *testing.B) {
if err != nil { mapper := MetricMapper{}
b.Fatalf("Config load error: %s %s", config, err) err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
} if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer() b.ResetTimer()
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for _, metric := range mappings { for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter) mapper.GetMapping(metric, MetricTypeCounter)
} }
}
})
} }
} }
@ -869,16 +886,115 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex)
"metric100.a.b.c.d.e.f.g.h.i.j.k.l", "metric100.a.b.c.d.e.f.g.h.i.j.k.l",
} }
mapper := MetricMapper{} for _, cacheType := range []string{"lru", "random"} {
err := mapper.InitFromYAMLString(config, 1000) b.Run(cacheType, func(b *testing.B) {
if err != nil { mapper := MetricMapper{}
b.Fatalf("Config load error: %s %s", config, err) err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
} if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer() b.ResetTimer()
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
for _, metric := range mappings { for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter) mapper.GetMapping(metric, MetricTypeCounter)
} }
}
})
}
}
func duplicateMetrics(count int, template string) []string {
var out []string
for i := 0; i < count; i++ {
out = append(out, fmt.Sprintf(template, i))
}
return out
}
func BenchmarkGlob100RulesCached100Metrics(b *testing.B) {
config := `---
mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
mappings := duplicateMetrics(100, "metric100")
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer()
for j := 0; j < b.N; j++ {
for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter)
}
}
})
}
}
func BenchmarkGlob100RulesCached100MetricsSmallCache(b *testing.B) {
// This benchmark is the worst case for the LRU cache.
// The cache is smaller than the total number of metrics and
// we iterate linearly through the metrics, so we will
// constantly evict cache entries.
config := `---
mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
mappings := duplicateMetrics(100, "metric100")
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType))
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer()
for j := 0; j < b.N; j++ {
for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter)
}
}
})
}
}
func BenchmarkGlob100RulesCached100MetricsRandomSmallCache(b *testing.B) {
// Slighly more realistic benchmark with a smaller cache.
// Randomly match metrics so we should have some cache hits.
config := `---
mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
base := duplicateMetrics(100, "metric100")
var mappings []string
for i := 0; i < 10; i++ {
mappings = append(mappings, base...)
}
r := rand.New(rand.NewSource(42))
r.Shuffle(len(mappings), func(i, j int) {
mappings[i], mappings[j] = mappings[j], mappings[i]
})
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType))
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
b.ResetTimer()
for j := 0; j < b.N; j++ {
for _, metric := range mappings {
mapper.GetMapping(metric, MetricTypeCounter)
}
}
})
} }
} }

View file

@ -14,6 +14,8 @@
package mapper package mapper
import ( import (
"sync"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -39,6 +41,18 @@ var (
) )
) )
type cacheOptions struct {
cacheType string
}
type CacheOption func(*cacheOptions)
func WithCacheType(cacheType string) CacheOption {
return func(o *cacheOptions) {
o.cacheType = cacheType
}
}
type MetricMapperCacheResult struct { type MetricMapperCacheResult struct {
Mapping *MetricMapping Mapping *MetricMapping
Matched bool Matched bool
@ -114,6 +128,69 @@ func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricTy
return return
} }
type MetricMapperRRCache struct {
MetricMapperCache
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
}
func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) {
cacheLength.Set(0)
c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
}
return c, nil
}
func (m *MetricMapperRRCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
key := formatKey(metricString, metricType)
m.lock.RLock()
result, ok := m.items[key]
m.lock.RUnlock()
return result, ok
}
func (m *MetricMapperRRCache) addItem(metricString string, metricType MetricType, result *MetricMapperCacheResult) {
go m.trackCacheLength()
key := formatKey(metricString, metricType)
m.lock.Lock()
m.items[key] = result
// evict an item if needed
if len(m.items) > m.size {
for k := range m.items {
delete(m.items, k)
break
}
}
m.lock.Unlock()
}
func (m *MetricMapperRRCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) {
e := &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels}
m.addItem(metricString, metricType, e)
}
func (m *MetricMapperRRCache) AddMiss(metricString string, metricType MetricType) {
e := &MetricMapperCacheResult{Matched: false}
m.addItem(metricString, metricType, e)
}
func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
cacheLength.Set(float64(length))
}
func init() { func init() {
prometheus.MustRegister(cacheLength) prometheus.MustRegister(cacheLength)
prometheus.MustRegister(cacheGetsTotal) prometheus.MustRegister(cacheGetsTotal)