From 90e247b09117cd5eaed285e8a2202ff77bfd763a Mon Sep 17 00:00:00 2001 From: bakins Date: Thu, 2 Jan 2020 08:39:39 -0500 Subject: [PATCH] Add random replacement cache Signed-off-by: bakins --- main.go | 13 +- pkg/mapper/mapper.go | 33 ++++- pkg/mapper/mapper_benchmark_test.go | 216 +++++++++++++++++++++------- pkg/mapper/mapper_cache.go | 77 ++++++++++ 4 files changed, 277 insertions(+), 62 deletions(-) diff --git a/main.go b/main.go index 221b1ce..b36d3c4 100644 --- a/main.go +++ b/main.go @@ -101,7 +101,7 @@ func tcpAddrFromString(addr string) (*net.TCPAddr, error) { }, nil } -func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger) { +func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGHUP) @@ -112,7 +112,7 @@ func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, continue } level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s) - err := mapper.InitFromFile(fileName, cacheSize) + err := mapper.InitFromFile(fileName, cacheSize, option) if err != nil { level.Info(logger).Log("msg", "Error reloading config", "error", err) configLoads.WithLabelValues("failure").Inc() @@ -149,6 +149,7 @@ func main() { mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() + cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random") eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int() eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int() eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration() @@ -162,6 +163,8 @@ func main() { kingpin.Parse() logger := promlog.New(promlogConfig) + cacheOption := mapper.WithCacheType(*cacheType) + if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" { level.Error(logger).Log("At least one of UDP/TCP/Unixgram listeners must be specified.") os.Exit(1) @@ -268,7 +271,7 @@ func main() { mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} if *mappingConfig != "" { - err := mapper.InitFromFile(*mappingConfig, *cacheSize) + err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) if err != nil { level.Error(logger).Log("msg", "error loading config", "error", err) os.Exit(1) @@ -283,10 +286,10 @@ func main() { } } } else { - mapper.InitCache(*cacheSize) + mapper.InitCache(*cacheSize, cacheOption) } - go configReloader(*mappingConfig, mapper, *cacheSize, logger) + go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) exporter := NewExporter(mapper, logger) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 6496310..e87ebff 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -18,12 +18,12 @@ import ( "io/ioutil" "regexp" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/statsd_exporter/pkg/mapper/fsm" yaml "gopkg.in/yaml.v2" - "time" ) var ( @@ -85,7 +85,7 @@ var defaultQuantiles = []metricObjective{ {Quantile: 0.99, Error: 0.001}, } -func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) error { +func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, options ...CacheOption) error { var n MetricMapper if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil { @@ -191,7 +191,7 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) er m.Defaults = n.Defaults m.Mappings = n.Mappings - m.InitCache(cacheSize) + m.InitCache(cacheSize, options...) if n.doFSM { var mappings []string @@ -213,20 +213,39 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) er return nil } -func (m *MetricMapper) InitFromFile(fileName string, cacheSize int) error { +func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...CacheOption) error { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { return err } - return m.InitFromYAMLString(string(mappingStr), cacheSize) + return m.InitFromYAMLString(string(mappingStr), cacheSize, options...) } -func (m *MetricMapper) InitCache(cacheSize int) { +func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) { if cacheSize == 0 { m.cache = NewMetricMapperNoopCache() } else { - cache, err := NewMetricMapperCache(cacheSize) + o := cacheOptions{ + cacheType: "lru", + } + for _, f := range options { + f(&o) + } + + var ( + cache MetricMapperCache + err error + ) + switch o.cacheType { + case "lru": + cache, err = NewMetricMapperCache(cacheSize) + case "random": + cache, err = NewMetricMapperRRCache(cacheSize) + default: + err = fmt.Errorf("unsupported cache type %q", o.cacheType) + } + if err != nil { log.Fatalf("Unable to setup metric cache. Caused by: %s", err) } diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index fe76d78..9210571 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -15,6 +15,7 @@ package mapper import ( "fmt" + "math/rand" "testing" ) @@ -580,17 +581,21 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) "metric100.a", } - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000) - if err != nil { - b.Fatalf("Config load error: %s %s", config, err) - } + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } - b.ResetTimer() - for j := 0; j < b.N; j++ { - for _, metric := range mappings { - mapper.GetMapping(metric, MetricTypeCounter) - } + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) } } @@ -626,17 +631,21 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) "metric5.a", } - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000) - if err != nil { - b.Fatalf("Config load error: %s %s", config, err) - } + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } - b.ResetTimer() - for j := 0; j < b.N; j++ { - for _, metric := range mappings { - mapper.GetMapping(metric, MetricTypeCounter) - } + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) } } @@ -668,17 +677,21 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) "metric100.a", } - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000) - if err != nil { - b.Fatalf("Config load error: %s %s", config, err) - } + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } - b.ResetTimer() - for j := 0; j < b.N; j++ { - for _, metric := range mappings { - mapper.GetMapping(metric, MetricTypeCounter) - } + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) } } @@ -800,17 +813,21 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) "metric50.a.b.c.d.e.f.g.h.i.j.k.l", } - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000) - if err != nil { - b.Fatalf("Config load error: %s %s", config, err) - } + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } - b.ResetTimer() - for j := 0; j < b.N; j++ { - for _, metric := range mappings { - mapper.GetMapping(metric, MetricTypeCounter) - } + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) } } @@ -869,16 +886,115 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) "metric100.a.b.c.d.e.f.g.h.i.j.k.l", } - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000) - if err != nil { - b.Fatalf("Config load error: %s %s", config, err) - } + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } - b.ResetTimer() - for j := 0; j < b.N; j++ { - for _, metric := range mappings { - mapper.GetMapping(metric, MetricTypeCounter) - } + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) + } +} + +func duplicateMetrics(count int, template string) []string { + var out []string + for i := 0; i < count; i++ { + out = append(out, fmt.Sprintf(template, i)) + } + return out +} + +func BenchmarkGlob100RulesCached100Metrics(b *testing.B) { + config := `--- +mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) + + mappings := duplicateMetrics(100, "metric100") + + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) + } +} + +func BenchmarkGlob100RulesCached100MetricsSmallCache(b *testing.B) { + // This benchmark is the worst case for the LRU cache. + // The cache is smaller than the total number of metrics and + // we iterate linearly through the metrics, so we will + // constantly evict cache entries. + config := `--- +mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) + + mappings := duplicateMetrics(100, "metric100") + + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) + } +} + +func BenchmarkGlob100RulesCached100MetricsRandomSmallCache(b *testing.B) { + // Slighly more realistic benchmark with a smaller cache. + // Randomly match metrics so we should have some cache hits. + config := `--- +mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) + + base := duplicateMetrics(100, "metric100") + var mappings []string + for i := 0; i < 10; i++ { + mappings = append(mappings, base...) + } + + r := rand.New(rand.NewSource(42)) + r.Shuffle(len(mappings), func(i, j int) { + mappings[i], mappings[j] = mappings[j], mappings[i] + }) + + for _, cacheType := range []string{"lru", "random"} { + b.Run(cacheType, func(b *testing.B) { + mapper := MetricMapper{} + err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + b.ResetTimer() + for j := 0; j < b.N; j++ { + for _, metric := range mappings { + mapper.GetMapping(metric, MetricTypeCounter) + } + } + }) } } diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 1452d65..5b252a7 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -14,6 +14,8 @@ package mapper import ( + "sync" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" ) @@ -39,6 +41,18 @@ var ( ) ) +type cacheOptions struct { + cacheType string +} + +type CacheOption func(*cacheOptions) + +func WithCacheType(cacheType string) CacheOption { + return func(o *cacheOptions) { + o.cacheType = cacheType + } +} + type MetricMapperCacheResult struct { Mapping *MetricMapping Matched bool @@ -114,6 +128,69 @@ func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricTy return } +type MetricMapperRRCache struct { + MetricMapperCache + lock sync.RWMutex + size int + items map[string]*MetricMapperCacheResult +} + +func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) { + cacheLength.Set(0) + c := &MetricMapperRRCache{ + items: make(map[string]*MetricMapperCacheResult, size+1), + size: size, + } + return c, nil +} + +func (m *MetricMapperRRCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { + key := formatKey(metricString, metricType) + + m.lock.RLock() + result, ok := m.items[key] + m.lock.RUnlock() + + return result, ok +} + +func (m *MetricMapperRRCache) addItem(metricString string, metricType MetricType, result *MetricMapperCacheResult) { + go m.trackCacheLength() + + key := formatKey(metricString, metricType) + + m.lock.Lock() + + m.items[key] = result + + // evict an item if needed + if len(m.items) > m.size { + for k := range m.items { + delete(m.items, k) + break + } + } + + m.lock.Unlock() +} + +func (m *MetricMapperRRCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { + e := &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels} + m.addItem(metricString, metricType, e) +} + +func (m *MetricMapperRRCache) AddMiss(metricString string, metricType MetricType) { + e := &MetricMapperCacheResult{Matched: false} + m.addItem(metricString, metricType, e) +} + +func (m *MetricMapperRRCache) trackCacheLength() { + m.lock.RLock() + length := len(m.items) + m.lock.RUnlock() + cacheLength.Set(float64(length)) +} + func init() { prometheus.MustRegister(cacheLength) prometheus.MustRegister(cacheGetsTotal)