From 0099df7f71603be5166ded21f544fd6fe69534e2 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 19:22:29 -0500 Subject: [PATCH] move lru and rr mapper caches to their own package and make mapper_cache a better an interface for implementing externally` Signed-off-by: glightfoot --- bridge_test.go | 2 +- exporter_benchmark_test.go | 3 +- main.go | 57 +++++-- pkg/exporter/exporter_test.go | 17 +- pkg/mapper/mapper.go | 81 +++++----- pkg/mapper/mapper_benchmark_test.go | 152 +++++++++++++----- pkg/mapper/mapper_cache.go | 152 ++---------------- pkg/mapper/mapper_test.go | 24 ++- pkg/mapper_cache/lru/lru.go | 52 ++++++ pkg/mapper_cache/metrics.go | 39 +++++ .../randomreplacement/randomreplacement.go | 70 ++++++++ 11 files changed, 403 insertions(+), 246 deletions(-) create mode 100644 pkg/mapper_cache/lru/lru.go create mode 100644 pkg/mapper_cache/metrics.go create mode 100644 pkg/mapper_cache/randomreplacement/randomreplacement.go diff --git a/bridge_test.go b/bridge_test.go index ea2d637..57e53b9 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -650,7 +650,7 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 17be295..b6697bf 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/line" @@ -167,7 +168,7 @@ mappings: ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } diff --git a/main.go b/main.go index e6ac3ad..5c08780 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,8 @@ import ( "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" ) const ( @@ -206,7 +208,7 @@ func serveHTTP(mux http.Handler, listenAddress string, logger log.Logger) { os.Exit(1) } -func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { +func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, logger log.Logger) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGHUP) @@ -218,12 +220,12 @@ func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSiz level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s) - reloadConfig(fileName, mapper, cacheSize, logger, option) + reloadConfig(fileName, mapper, logger) } } -func reloadConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { - err := mapper.InitFromFile(fileName, cacheSize, option) +func reloadConfig(fileName string, mapper *mapper.MetricMapper, logger log.Logger) { + err := mapper.InitFromFile(fileName) if err != nil { level.Info(logger).Log("msg", "Error reloading config", "error", err) configLoads.WithLabelValues("failure").Inc() @@ -247,6 +249,29 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger return nil } +func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) (mapper.MetricMapperCache, error) { + var cache mapper.MetricMapperCache + var err error + if cacheSize == 0 { + cache = mapper.NewMetricMapperNoopCache() + } else { + switch cacheType { + case "lru": + cache, err = lru.NewMetricMapperLRUCache(registerer, cacheSize) + case "random": + cache, err = randomreplacement.NewMetricMapperRRCache(registerer, cacheSize) + default: + err = fmt.Errorf("unsupported cache type %q", cacheType) + } + + if err != nil { + return nil, err + } + } + + return cache, nil +} + func main() { var ( listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String() @@ -293,8 +318,6 @@ func main() { parser.EnableSignalFXParsing() } - cacheOption := mapper.WithCacheType(*cacheType) - level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info()) level.Info(logger).Log("msg", "Build context", "context", version.BuildContext()) @@ -302,15 +325,23 @@ func main() { defer close(events) eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) - mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount} + thisMapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount} + + cache, err := getCache(*cacheSize, *cacheType, thisMapper.Registerer) + if err != nil { + level.Error(logger).Log("msg", "Unable to setup metric mapper cache", "error", err) + os.Exit(1) + } + thisMapper.UseCache(cache) + if *mappingConfig != "" { - err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) + err := thisMapper.InitFromFile(*mappingConfig) if err != nil { level.Error(logger).Log("msg", "error loading config", "error", err) os.Exit(1) } if *dumpFSMPath != "" { - err := dumpFSM(mapper, *dumpFSMPath, logger) + err := dumpFSM(thisMapper, *dumpFSMPath, logger) if err != nil { level.Error(logger).Log("msg", "error dumping FSM", "error", err) // Failure to dump the FSM is an error (the user asked for it and it @@ -318,11 +349,9 @@ func main() { // afterwards). } } - } else { - mapper.InitCache(*cacheSize, cacheOption) } - exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) if *checkConfig { level.Info(logger).Log("msg", "Configuration check successful, exiting") @@ -489,7 +518,7 @@ func main() { return } level.Info(logger).Log("msg", "Received lifecycle api reload, attempting reload") - reloadConfig(*mappingConfig, mapper, *cacheSize, logger, cacheOption) + reloadConfig(*mappingConfig, thisMapper, logger) } }) mux.HandleFunc("/-/quit", func(w http.ResponseWriter, r *http.Request) { @@ -518,7 +547,7 @@ func main() { go serveHTTP(mux, *listenAddress, logger) - go sighupConfigReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) + go sighupConfigReloader(*mappingConfig, thisMapper, logger) go exporter.Listen(events) signals := make(chan os.Signal, 1) diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index 607e14c..bc17648 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -182,7 +182,7 @@ func TestNegativeCounter(t *testing.T) { prev := getTelemetryCounterValue(errorCounter) testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) + testMapper.UseCache(mapper.NewMetricMapperNoopCache()) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -260,7 +260,7 @@ mappings: name: "histogram_test" ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -318,7 +318,8 @@ mappings: ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + testMapper.UseCache(mapper.NewMetricMapperNoopCache()) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -528,7 +529,7 @@ mappings: for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -585,7 +586,7 @@ mappings: name: "${1}" ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -658,7 +659,6 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { }() testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -672,7 +672,6 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -717,7 +716,6 @@ func TestHistogramUnits(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram ex.Listen(events) @@ -754,7 +752,6 @@ func TestCounterIncrement(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) }() @@ -857,7 +854,7 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 680952b..8666a6f 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -71,7 +71,7 @@ var defaultQuantiles = []metricObjective{ {Quantile: 0.99, Error: 0.001}, } -func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, options ...CacheOption) error { +func (m *MetricMapper) InitFromYAMLString(fileContents string) error { var n MetricMapper if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil { @@ -230,7 +230,13 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op m.Defaults = n.Defaults m.Mappings = n.Mappings - m.InitCache(cacheSize, options...) + + // If no cache has been configured, use a noop cache + if m.cache == nil { + m.cache = NewMetricMapperNoopCache() + } + // Reset the cache since this function can be used to reload config + m.cache.Reset() if n.doFSM { var mappings []string @@ -252,53 +258,34 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op return nil } -func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...CacheOption) error { +func (m *MetricMapper) InitFromFile(fileName string) error { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { return err } - return m.InitFromYAMLString(string(mappingStr), cacheSize, options...) + return m.InitFromYAMLString(string(mappingStr)) } -func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) { - if cacheSize == 0 { - m.cache = NewMetricMapperNoopCache(m.Registerer) - } else { - o := cacheOptions{ - cacheType: "lru", - } - for _, f := range options { - f(&o) - } - - var ( - cache MetricMapperCache - err error - ) - switch o.cacheType { - case "lru": - cache, err = NewMetricMapperCache(m.Registerer, cacheSize) - case "random": - cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize) - default: - err = fmt.Errorf("unsupported cache type %q", o.cacheType) - } - - if err != nil { - log.Fatalf("Unable to setup metric cache. Caused by: %s", err) - } - m.cache = cache - } +func (m *MetricMapper) UseCache(cache MetricMapperCache) { + m.cache = cache } func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) { m.mutex.RLock() defer m.mutex.RUnlock() - result, cached := m.cache.Get(statsdMetric, statsdMetricType) - if cached { - return result.Mapping, result.Labels, result.Matched + + // default cache to noop cache if used from an uninitialized mapper + if m.cache == nil { + m.cache = NewMetricMapperNoopCache() } + + result, cached := m.cache.Get(formatKey(statsdMetric, statsdMetricType)) + if cached { + r := result.(MetricMapperCacheResult) + return r.Mapping, r.Labels, r.Matched + } + // glob matching if m.doFSM { finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType)) @@ -312,12 +299,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy labels[result.labelKeys[index]] = formatter.Format(captures) } - m.cache.AddMatch(statsdMetric, statsdMetricType, result, labels) + r := MetricMapperCacheResult{ + Mapping: result, + Matched: true, + Labels: labels, + } + // add match to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) return result, labels, true } else if !m.doRegex { // if there's no regex match type, return immediately - m.cache.AddMiss(statsdMetric, statsdMetricType) + // Add miss cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) return nil, nil, false } } @@ -350,12 +344,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy labels[label] = string(value) } - m.cache.AddMatch(statsdMetric, statsdMetricType, &mapping, labels) + r := MetricMapperCacheResult{ + Mapping: &mapping, + Matched: true, + Labels: labels, + } + // Add Match to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) return &mapping, labels, true } - m.cache.AddMiss(statsdMetric, statsdMetricType) + // Add Miss to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) return nil, nil, false } diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index 9210571..5219e1d 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -17,6 +17,9 @@ import ( "fmt" "math/rand" "testing" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" ) var ( @@ -105,7 +108,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -169,7 +172,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -240,7 +243,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -304,7 +307,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -331,7 +334,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -359,7 +362,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -385,7 +388,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -412,7 +415,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -438,7 +441,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -465,7 +468,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -502,7 +505,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -540,7 +543,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -561,7 +564,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -582,9 +585,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -609,7 +621,7 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -632,9 +644,19 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -657,7 +679,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -678,9 +700,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -703,7 +734,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -726,7 +757,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -749,7 +780,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -772,7 +803,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -793,7 +824,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -814,9 +845,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -841,7 +881,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -864,7 +904,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -887,9 +927,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -914,14 +963,24 @@ func duplicateMetrics(count int, template string) []string { func BenchmarkGlob100RulesCached100Metrics(b *testing.B) { config := `--- -mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) +mappings:` + duplicateRules(101, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -947,9 +1006,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -982,9 +1050,19 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) }) for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 306e54e..4f9ef2b 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -14,9 +14,6 @@ package mapper import ( - "sync" - - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" ) @@ -56,155 +53,38 @@ func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics { return &m } -type cacheOptions struct { - cacheType string -} - -type CacheOption func(*cacheOptions) - -func WithCacheType(cacheType string) CacheOption { - return func(o *cacheOptions) { - o.cacheType = cacheType - } -} - type MetricMapperCacheResult struct { Mapping *MetricMapping Matched bool Labels prometheus.Labels } +// MetricMapperCache must be thread-safe and should be instrumented with CacheMetrics type MetricMapperCache interface { - Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) - AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) - AddMiss(metricString string, metricType MetricType) + // Get a cached result + Get(metricKey string) (interface{}, bool) + // Add a statsd MetricMapperResult to the cache + Add(metricKey string, result interface{}) // Add an item to the cache + // Reset clears the cache for config reloads + Reset() } -type MetricMapperLRUCache struct { - MetricMapperCache - cache *lru.Cache - metrics *CacheMetrics +type MetricMapperNoopCache struct{} + +func NewMetricMapperNoopCache() *MetricMapperNoopCache { + return &MetricMapperNoopCache{} } -type MetricMapperNoopCache struct { - MetricMapperCache - metrics *CacheMetrics +func (m *MetricMapperNoopCache) Get(metricKey string) (interface{}, bool) { + return nil, false } -func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) { - metrics := NewCacheMetrics(reg) - cache, err := lru.New(size) - if err != nil { - return &MetricMapperLRUCache{}, err - } - return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil +func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) { + return } -func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - m.metrics.CacheGetsTotal.Inc() - if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok { - m.metrics.CacheHitsTotal.Inc() - return result.(*MetricMapperCacheResult), true - } else { - return nil, false - } -} - -func (m *MetricMapperLRUCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - go m.trackCacheLength() - m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels}) -} - -func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricType) { - go m.trackCacheLength() - m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Matched: false}) -} - -func (m *MetricMapperLRUCache) trackCacheLength() { - m.metrics.CacheLength.Set(float64(m.cache.Len())) -} +func (m *MetricMapperNoopCache) Reset() {} func formatKey(metricString string, metricType MetricType) string { return string(metricType) + "." + metricString } - -func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache { - return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)} -} - -func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - return nil, false -} - -func (m *MetricMapperNoopCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - return -} - -func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricType) { - return -} - -type MetricMapperRRCache struct { - MetricMapperCache - lock sync.RWMutex - size int - items map[string]*MetricMapperCacheResult - metrics *CacheMetrics -} - -func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) { - metrics := NewCacheMetrics(reg) - c := &MetricMapperRRCache{ - items: make(map[string]*MetricMapperCacheResult, size+1), - size: size, - metrics: metrics, - } - return c, nil -} - -func (m *MetricMapperRRCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - key := formatKey(metricString, metricType) - - m.lock.RLock() - result, ok := m.items[key] - m.lock.RUnlock() - - return result, ok -} - -func (m *MetricMapperRRCache) addItem(metricString string, metricType MetricType, result *MetricMapperCacheResult) { - go m.trackCacheLength() - - key := formatKey(metricString, metricType) - - m.lock.Lock() - - m.items[key] = result - - // evict an item if needed - if len(m.items) > m.size { - for k := range m.items { - delete(m.items, k) - break - } - } - - m.lock.Unlock() -} - -func (m *MetricMapperRRCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - e := &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels} - m.addItem(metricString, metricType, e) -} - -func (m *MetricMapperRRCache) AddMiss(metricString string, metricType MetricType) { - e := &MetricMapperCacheResult{Matched: false} - m.addItem(metricString, metricType, e) -} - -func (m *MetricMapperRRCache) trackCacheLength() { - m.lock.RLock() - length := len(m.items) - m.lock.RUnlock() - m.metrics.CacheLength.Set(float64(length)) -} diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index ad7e32e..006b8e5 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -18,6 +18,8 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" ) type mappings []struct { @@ -1379,12 +1381,15 @@ mappings: } mapper := MetricMapper{} + cache, _ := lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + mapper.UseCache(cache) + for i, scenario := range scenarios { if scenario.testName == "" { t.Fatalf("Missing testName in scenario %+v", scenario) } t.Run(scenario.testName, func(t *testing.T) { - err := mapper.InitFromYAMLString(scenario.config, 1000) + err := mapper.InitFromYAMLString(scenario.config) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) } @@ -1542,7 +1547,7 @@ mappings: } t.Run(scenario.testName, func(t *testing.T) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(scenario.config, 0) + err := mapper.InitFromYAMLString(scenario.config) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) } @@ -1571,7 +1576,7 @@ mappings: app: "$2" ` mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { t.Fatalf("config load error: %s ", err) } @@ -1583,19 +1588,24 @@ mappings: "aa.bb.dd.myapp": "aa_bb_dd_total", } + lruCache, err := lru.NewMetricMapperLRUCache(mapper.Registerer, len(names)) + if err != nil { + t.Fatalf(err.Error()) + } + scenarios := []struct { - cacheSize int + cache MetricMapperCache }{ { - cacheSize: 0, + cache: NewMetricMapperNoopCache(), }, { - cacheSize: len(names), + cache: lruCache, }, } for i, scenario := range scenarios { - mapper.InitCache(scenario.cacheSize) + mapper.UseCache(scenario.cache) // run multiple times to ensure cache works as expected for j := 0; j < 10; j++ { diff --git a/pkg/mapper_cache/lru/lru.go b/pkg/mapper_cache/lru/lru.go new file mode 100644 index 0000000..dc8d9ac --- /dev/null +++ b/pkg/mapper_cache/lru/lru.go @@ -0,0 +1,52 @@ +package lru + +import ( + "github.com/prometheus/client_golang/prometheus" + + lru2 "github.com/hashicorp/golang-lru" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache" +) + +type metricMapperLRUCache struct { + cache *lru2.Cache + metrics *mapper_cache.CacheMetrics +} + +func NewMetricMapperLRUCache(reg prometheus.Registerer, size int) (*metricMapperLRUCache, error) { + if size <= 0 { + return nil, nil + } + + metrics := mapper_cache.NewCacheMetrics(reg) + cache, err := lru2.New(size) + if err != nil { + return &metricMapperLRUCache{}, err + } + + return &metricMapperLRUCache{metrics: metrics, cache: cache}, nil +} + +func (m *metricMapperLRUCache) Get(metricKey string) (interface{}, bool) { + m.metrics.CacheGetsTotal.Inc() + if result, ok := m.cache.Get(metricKey); ok { + m.metrics.CacheHitsTotal.Inc() + return result, true + } else { + return nil, false + } +} + +func (m *metricMapperLRUCache) Add(metricKey string, result interface{}) { + go m.trackCacheLength() + m.cache.Add(metricKey, result) +} + +func (m *metricMapperLRUCache) trackCacheLength() { + m.metrics.CacheLength.Set(float64(m.cache.Len())) +} + +func (m *metricMapperLRUCache) Reset() { + m.cache.Purge() + m.metrics.CacheLength.Set(0) +} diff --git a/pkg/mapper_cache/metrics.go b/pkg/mapper_cache/metrics.go new file mode 100644 index 0000000..a6fa3f6 --- /dev/null +++ b/pkg/mapper_cache/metrics.go @@ -0,0 +1,39 @@ +package mapper_cache + +import "github.com/prometheus/client_golang/prometheus" + +type CacheMetrics struct { + CacheLength prometheus.Gauge + CacheGetsTotal prometheus.Counter + CacheHitsTotal prometheus.Counter +} + +func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics { + var m CacheMetrics + + m.CacheLength = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "statsd_metric_mapper_cache_length", + Help: "The count of unique metrics currently cached.", + }, + ) + m.CacheGetsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_metric_mapper_cache_gets_total", + Help: "The count of total metric cache gets.", + }, + ) + m.CacheHitsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_metric_mapper_cache_hits_total", + Help: "The count of total metric cache hits.", + }, + ) + + if reg != nil { + reg.MustRegister(m.CacheLength) + reg.MustRegister(m.CacheGetsTotal) + reg.MustRegister(m.CacheHitsTotal) + } + return &m +} diff --git a/pkg/mapper_cache/randomreplacement/randomreplacement.go b/pkg/mapper_cache/randomreplacement/randomreplacement.go new file mode 100644 index 0000000..0bc470f --- /dev/null +++ b/pkg/mapper_cache/randomreplacement/randomreplacement.go @@ -0,0 +1,70 @@ +package randomreplacement + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache" +) + +type metricMapperRRCache struct { + lock sync.RWMutex + size int + items map[string]interface{} + metrics *mapper_cache.CacheMetrics +} + +func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*metricMapperRRCache, error) { + if size <= 0 { + return nil, nil + } + + metrics := mapper_cache.NewCacheMetrics(reg) + c := &metricMapperRRCache{ + items: make(map[string]interface{}, size+1), + size: size, + metrics: metrics, + } + return c, nil +} + +func (m *metricMapperRRCache) Get(metricKey string) (interface{}, bool) { + m.lock.RLock() + result, ok := m.items[metricKey] + m.lock.RUnlock() + + return result, ok +} + +func (m *metricMapperRRCache) Add(metricKey string, result interface{}) { + go m.trackCacheLength() + + m.lock.Lock() + + m.items[metricKey] = result + + // evict an item if needed + if len(m.items) > m.size { + for k := range m.items { + delete(m.items, k) + break + } + } + + m.lock.Unlock() +} + +func (m *metricMapperRRCache) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.items = make(map[string]interface{}, m.size+1) + m.metrics.CacheLength.Set(0) +} + +func (m *metricMapperRRCache) trackCacheLength() { + m.lock.RLock() + length := len(m.items) + m.lock.RUnlock() + m.metrics.CacheLength.Set(float64(length)) +}