From 0099df7f71603be5166ded21f544fd6fe69534e2 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 19:22:29 -0500 Subject: [PATCH 1/7] move lru and rr mapper caches to their own package and make mapper_cache a better an interface for implementing externally` Signed-off-by: glightfoot --- bridge_test.go | 2 +- exporter_benchmark_test.go | 3 +- main.go | 57 +++++-- pkg/exporter/exporter_test.go | 17 +- pkg/mapper/mapper.go | 81 +++++----- pkg/mapper/mapper_benchmark_test.go | 152 +++++++++++++----- pkg/mapper/mapper_cache.go | 152 ++---------------- pkg/mapper/mapper_test.go | 24 ++- pkg/mapper_cache/lru/lru.go | 52 ++++++ pkg/mapper_cache/metrics.go | 39 +++++ .../randomreplacement/randomreplacement.go | 70 ++++++++ 11 files changed, 403 insertions(+), 246 deletions(-) create mode 100644 pkg/mapper_cache/lru/lru.go create mode 100644 pkg/mapper_cache/metrics.go create mode 100644 pkg/mapper_cache/randomreplacement/randomreplacement.go diff --git a/bridge_test.go b/bridge_test.go index ea2d637..57e53b9 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -650,7 +650,7 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 17be295..b6697bf 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/line" @@ -167,7 +168,7 @@ mappings: ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } diff --git a/main.go b/main.go index e6ac3ad..5c08780 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,8 @@ import ( "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" ) const ( @@ -206,7 +208,7 @@ func serveHTTP(mux http.Handler, listenAddress string, logger log.Logger) { os.Exit(1) } -func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { +func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, logger log.Logger) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGHUP) @@ -218,12 +220,12 @@ func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSiz level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s) - reloadConfig(fileName, mapper, cacheSize, logger, option) + reloadConfig(fileName, mapper, logger) } } -func reloadConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { - err := mapper.InitFromFile(fileName, cacheSize, option) +func reloadConfig(fileName string, mapper *mapper.MetricMapper, logger log.Logger) { + err := mapper.InitFromFile(fileName) if err != nil { level.Info(logger).Log("msg", "Error reloading config", "error", err) configLoads.WithLabelValues("failure").Inc() @@ -247,6 +249,29 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger return nil } +func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) (mapper.MetricMapperCache, error) { + var cache mapper.MetricMapperCache + var err error + if cacheSize == 0 { + cache = mapper.NewMetricMapperNoopCache() + } else { + switch cacheType { + case "lru": + cache, err = lru.NewMetricMapperLRUCache(registerer, cacheSize) + case "random": + cache, err = randomreplacement.NewMetricMapperRRCache(registerer, cacheSize) + default: + err = fmt.Errorf("unsupported cache type %q", cacheType) + } + + if err != nil { + return nil, err + } + } + + return cache, nil +} + func main() { var ( listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String() @@ -293,8 +318,6 @@ func main() { parser.EnableSignalFXParsing() } - cacheOption := mapper.WithCacheType(*cacheType) - level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info()) level.Info(logger).Log("msg", "Build context", "context", version.BuildContext()) @@ -302,15 +325,23 @@ func main() { defer close(events) eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) - mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount} + thisMapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount} + + cache, err := getCache(*cacheSize, *cacheType, thisMapper.Registerer) + if err != nil { + level.Error(logger).Log("msg", "Unable to setup metric mapper cache", "error", err) + os.Exit(1) + } + thisMapper.UseCache(cache) + if *mappingConfig != "" { - err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) + err := thisMapper.InitFromFile(*mappingConfig) if err != nil { level.Error(logger).Log("msg", "error loading config", "error", err) os.Exit(1) } if *dumpFSMPath != "" { - err := dumpFSM(mapper, *dumpFSMPath, logger) + err := dumpFSM(thisMapper, *dumpFSMPath, logger) if err != nil { level.Error(logger).Log("msg", "error dumping FSM", "error", err) // Failure to dump the FSM is an error (the user asked for it and it @@ -318,11 +349,9 @@ func main() { // afterwards). } } - } else { - mapper.InitCache(*cacheSize, cacheOption) } - exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) if *checkConfig { level.Info(logger).Log("msg", "Configuration check successful, exiting") @@ -489,7 +518,7 @@ func main() { return } level.Info(logger).Log("msg", "Received lifecycle api reload, attempting reload") - reloadConfig(*mappingConfig, mapper, *cacheSize, logger, cacheOption) + reloadConfig(*mappingConfig, thisMapper, logger) } }) mux.HandleFunc("/-/quit", func(w http.ResponseWriter, r *http.Request) { @@ -518,7 +547,7 @@ func main() { go serveHTTP(mux, *listenAddress, logger) - go sighupConfigReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) + go sighupConfigReloader(*mappingConfig, thisMapper, logger) go exporter.Listen(events) signals := make(chan os.Signal, 1) diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index 607e14c..bc17648 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -182,7 +182,7 @@ func TestNegativeCounter(t *testing.T) { prev := getTelemetryCounterValue(errorCounter) testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) + testMapper.UseCache(mapper.NewMetricMapperNoopCache()) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -260,7 +260,7 @@ mappings: name: "histogram_test" ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -318,7 +318,8 @@ mappings: ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + testMapper.UseCache(mapper.NewMetricMapperNoopCache()) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -528,7 +529,7 @@ mappings: for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -585,7 +586,7 @@ mappings: name: "${1}" ` testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } @@ -658,7 +659,6 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { }() testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -672,7 +672,6 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -717,7 +716,6 @@ func TestHistogramUnits(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram ex.Listen(events) @@ -754,7 +752,6 @@ func TestCounterIncrement(t *testing.T) { events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} - testMapper.InitCache(0) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) }() @@ -857,7 +854,7 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config, 0) + err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 680952b..8666a6f 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -71,7 +71,7 @@ var defaultQuantiles = []metricObjective{ {Quantile: 0.99, Error: 0.001}, } -func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, options ...CacheOption) error { +func (m *MetricMapper) InitFromYAMLString(fileContents string) error { var n MetricMapper if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil { @@ -230,7 +230,13 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op m.Defaults = n.Defaults m.Mappings = n.Mappings - m.InitCache(cacheSize, options...) + + // If no cache has been configured, use a noop cache + if m.cache == nil { + m.cache = NewMetricMapperNoopCache() + } + // Reset the cache since this function can be used to reload config + m.cache.Reset() if n.doFSM { var mappings []string @@ -252,53 +258,34 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op return nil } -func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...CacheOption) error { +func (m *MetricMapper) InitFromFile(fileName string) error { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { return err } - return m.InitFromYAMLString(string(mappingStr), cacheSize, options...) + return m.InitFromYAMLString(string(mappingStr)) } -func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) { - if cacheSize == 0 { - m.cache = NewMetricMapperNoopCache(m.Registerer) - } else { - o := cacheOptions{ - cacheType: "lru", - } - for _, f := range options { - f(&o) - } - - var ( - cache MetricMapperCache - err error - ) - switch o.cacheType { - case "lru": - cache, err = NewMetricMapperCache(m.Registerer, cacheSize) - case "random": - cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize) - default: - err = fmt.Errorf("unsupported cache type %q", o.cacheType) - } - - if err != nil { - log.Fatalf("Unable to setup metric cache. Caused by: %s", err) - } - m.cache = cache - } +func (m *MetricMapper) UseCache(cache MetricMapperCache) { + m.cache = cache } func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) { m.mutex.RLock() defer m.mutex.RUnlock() - result, cached := m.cache.Get(statsdMetric, statsdMetricType) - if cached { - return result.Mapping, result.Labels, result.Matched + + // default cache to noop cache if used from an uninitialized mapper + if m.cache == nil { + m.cache = NewMetricMapperNoopCache() } + + result, cached := m.cache.Get(formatKey(statsdMetric, statsdMetricType)) + if cached { + r := result.(MetricMapperCacheResult) + return r.Mapping, r.Labels, r.Matched + } + // glob matching if m.doFSM { finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType)) @@ -312,12 +299,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy labels[result.labelKeys[index]] = formatter.Format(captures) } - m.cache.AddMatch(statsdMetric, statsdMetricType, result, labels) + r := MetricMapperCacheResult{ + Mapping: result, + Matched: true, + Labels: labels, + } + // add match to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) return result, labels, true } else if !m.doRegex { // if there's no regex match type, return immediately - m.cache.AddMiss(statsdMetric, statsdMetricType) + // Add miss cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) return nil, nil, false } } @@ -350,12 +344,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy labels[label] = string(value) } - m.cache.AddMatch(statsdMetric, statsdMetricType, &mapping, labels) + r := MetricMapperCacheResult{ + Mapping: &mapping, + Matched: true, + Labels: labels, + } + // Add Match to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) return &mapping, labels, true } - m.cache.AddMiss(statsdMetric, statsdMetricType) + // Add Miss to cache + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) return nil, nil, false } diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index 9210571..5219e1d 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -17,6 +17,9 @@ import ( "fmt" "math/rand" "testing" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" ) var ( @@ -105,7 +108,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -169,7 +172,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -240,7 +243,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -304,7 +307,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -331,7 +334,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -359,7 +362,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -385,7 +388,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -412,7 +415,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -438,7 +441,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -465,7 +468,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -502,7 +505,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -540,7 +543,7 @@ mappings: } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -561,7 +564,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -582,9 +585,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -609,7 +621,7 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -632,9 +644,19 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -657,7 +679,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -678,9 +700,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -703,7 +734,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -726,7 +757,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -749,7 +780,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -772,7 +803,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -793,7 +824,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -814,9 +845,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -841,7 +881,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -864,7 +904,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -887,9 +927,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -914,14 +963,24 @@ func duplicateMetrics(count int, template string) []string { func BenchmarkGlob100RulesCached100Metrics(b *testing.B) { config := `--- -mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) +mappings:` + duplicateRules(101, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -947,9 +1006,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } @@ -982,9 +1050,19 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) }) for _, cacheType := range []string{"lru", "random"} { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) + } + mapper.UseCache(cache) + b.Run(cacheType, func(b *testing.B) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType)) + err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) } diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 306e54e..4f9ef2b 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -14,9 +14,6 @@ package mapper import ( - "sync" - - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" ) @@ -56,155 +53,38 @@ func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics { return &m } -type cacheOptions struct { - cacheType string -} - -type CacheOption func(*cacheOptions) - -func WithCacheType(cacheType string) CacheOption { - return func(o *cacheOptions) { - o.cacheType = cacheType - } -} - type MetricMapperCacheResult struct { Mapping *MetricMapping Matched bool Labels prometheus.Labels } +// MetricMapperCache must be thread-safe and should be instrumented with CacheMetrics type MetricMapperCache interface { - Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) - AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) - AddMiss(metricString string, metricType MetricType) + // Get a cached result + Get(metricKey string) (interface{}, bool) + // Add a statsd MetricMapperResult to the cache + Add(metricKey string, result interface{}) // Add an item to the cache + // Reset clears the cache for config reloads + Reset() } -type MetricMapperLRUCache struct { - MetricMapperCache - cache *lru.Cache - metrics *CacheMetrics +type MetricMapperNoopCache struct{} + +func NewMetricMapperNoopCache() *MetricMapperNoopCache { + return &MetricMapperNoopCache{} } -type MetricMapperNoopCache struct { - MetricMapperCache - metrics *CacheMetrics +func (m *MetricMapperNoopCache) Get(metricKey string) (interface{}, bool) { + return nil, false } -func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) { - metrics := NewCacheMetrics(reg) - cache, err := lru.New(size) - if err != nil { - return &MetricMapperLRUCache{}, err - } - return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil +func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) { + return } -func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - m.metrics.CacheGetsTotal.Inc() - if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok { - m.metrics.CacheHitsTotal.Inc() - return result.(*MetricMapperCacheResult), true - } else { - return nil, false - } -} - -func (m *MetricMapperLRUCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - go m.trackCacheLength() - m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels}) -} - -func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricType) { - go m.trackCacheLength() - m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Matched: false}) -} - -func (m *MetricMapperLRUCache) trackCacheLength() { - m.metrics.CacheLength.Set(float64(m.cache.Len())) -} +func (m *MetricMapperNoopCache) Reset() {} func formatKey(metricString string, metricType MetricType) string { return string(metricType) + "." + metricString } - -func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache { - return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)} -} - -func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - return nil, false -} - -func (m *MetricMapperNoopCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - return -} - -func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricType) { - return -} - -type MetricMapperRRCache struct { - MetricMapperCache - lock sync.RWMutex - size int - items map[string]*MetricMapperCacheResult - metrics *CacheMetrics -} - -func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) { - metrics := NewCacheMetrics(reg) - c := &MetricMapperRRCache{ - items: make(map[string]*MetricMapperCacheResult, size+1), - size: size, - metrics: metrics, - } - return c, nil -} - -func (m *MetricMapperRRCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { - key := formatKey(metricString, metricType) - - m.lock.RLock() - result, ok := m.items[key] - m.lock.RUnlock() - - return result, ok -} - -func (m *MetricMapperRRCache) addItem(metricString string, metricType MetricType, result *MetricMapperCacheResult) { - go m.trackCacheLength() - - key := formatKey(metricString, metricType) - - m.lock.Lock() - - m.items[key] = result - - // evict an item if needed - if len(m.items) > m.size { - for k := range m.items { - delete(m.items, k) - break - } - } - - m.lock.Unlock() -} - -func (m *MetricMapperRRCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) { - e := &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels} - m.addItem(metricString, metricType, e) -} - -func (m *MetricMapperRRCache) AddMiss(metricString string, metricType MetricType) { - e := &MetricMapperCacheResult{Matched: false} - m.addItem(metricString, metricType, e) -} - -func (m *MetricMapperRRCache) trackCacheLength() { - m.lock.RLock() - length := len(m.items) - m.lock.RUnlock() - m.metrics.CacheLength.Set(float64(length)) -} diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index ad7e32e..006b8e5 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -18,6 +18,8 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" ) type mappings []struct { @@ -1379,12 +1381,15 @@ mappings: } mapper := MetricMapper{} + cache, _ := lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) + mapper.UseCache(cache) + for i, scenario := range scenarios { if scenario.testName == "" { t.Fatalf("Missing testName in scenario %+v", scenario) } t.Run(scenario.testName, func(t *testing.T) { - err := mapper.InitFromYAMLString(scenario.config, 1000) + err := mapper.InitFromYAMLString(scenario.config) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) } @@ -1542,7 +1547,7 @@ mappings: } t.Run(scenario.testName, func(t *testing.T) { mapper := MetricMapper{} - err := mapper.InitFromYAMLString(scenario.config, 0) + err := mapper.InitFromYAMLString(scenario.config) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err) } @@ -1571,7 +1576,7 @@ mappings: app: "$2" ` mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config, 0) + err := mapper.InitFromYAMLString(config) if err != nil { t.Fatalf("config load error: %s ", err) } @@ -1583,19 +1588,24 @@ mappings: "aa.bb.dd.myapp": "aa_bb_dd_total", } + lruCache, err := lru.NewMetricMapperLRUCache(mapper.Registerer, len(names)) + if err != nil { + t.Fatalf(err.Error()) + } + scenarios := []struct { - cacheSize int + cache MetricMapperCache }{ { - cacheSize: 0, + cache: NewMetricMapperNoopCache(), }, { - cacheSize: len(names), + cache: lruCache, }, } for i, scenario := range scenarios { - mapper.InitCache(scenario.cacheSize) + mapper.UseCache(scenario.cache) // run multiple times to ensure cache works as expected for j := 0; j < 10; j++ { diff --git a/pkg/mapper_cache/lru/lru.go b/pkg/mapper_cache/lru/lru.go new file mode 100644 index 0000000..dc8d9ac --- /dev/null +++ b/pkg/mapper_cache/lru/lru.go @@ -0,0 +1,52 @@ +package lru + +import ( + "github.com/prometheus/client_golang/prometheus" + + lru2 "github.com/hashicorp/golang-lru" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache" +) + +type metricMapperLRUCache struct { + cache *lru2.Cache + metrics *mapper_cache.CacheMetrics +} + +func NewMetricMapperLRUCache(reg prometheus.Registerer, size int) (*metricMapperLRUCache, error) { + if size <= 0 { + return nil, nil + } + + metrics := mapper_cache.NewCacheMetrics(reg) + cache, err := lru2.New(size) + if err != nil { + return &metricMapperLRUCache{}, err + } + + return &metricMapperLRUCache{metrics: metrics, cache: cache}, nil +} + +func (m *metricMapperLRUCache) Get(metricKey string) (interface{}, bool) { + m.metrics.CacheGetsTotal.Inc() + if result, ok := m.cache.Get(metricKey); ok { + m.metrics.CacheHitsTotal.Inc() + return result, true + } else { + return nil, false + } +} + +func (m *metricMapperLRUCache) Add(metricKey string, result interface{}) { + go m.trackCacheLength() + m.cache.Add(metricKey, result) +} + +func (m *metricMapperLRUCache) trackCacheLength() { + m.metrics.CacheLength.Set(float64(m.cache.Len())) +} + +func (m *metricMapperLRUCache) Reset() { + m.cache.Purge() + m.metrics.CacheLength.Set(0) +} diff --git a/pkg/mapper_cache/metrics.go b/pkg/mapper_cache/metrics.go new file mode 100644 index 0000000..a6fa3f6 --- /dev/null +++ b/pkg/mapper_cache/metrics.go @@ -0,0 +1,39 @@ +package mapper_cache + +import "github.com/prometheus/client_golang/prometheus" + +type CacheMetrics struct { + CacheLength prometheus.Gauge + CacheGetsTotal prometheus.Counter + CacheHitsTotal prometheus.Counter +} + +func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics { + var m CacheMetrics + + m.CacheLength = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "statsd_metric_mapper_cache_length", + Help: "The count of unique metrics currently cached.", + }, + ) + m.CacheGetsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_metric_mapper_cache_gets_total", + Help: "The count of total metric cache gets.", + }, + ) + m.CacheHitsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_metric_mapper_cache_hits_total", + Help: "The count of total metric cache hits.", + }, + ) + + if reg != nil { + reg.MustRegister(m.CacheLength) + reg.MustRegister(m.CacheGetsTotal) + reg.MustRegister(m.CacheHitsTotal) + } + return &m +} diff --git a/pkg/mapper_cache/randomreplacement/randomreplacement.go b/pkg/mapper_cache/randomreplacement/randomreplacement.go new file mode 100644 index 0000000..0bc470f --- /dev/null +++ b/pkg/mapper_cache/randomreplacement/randomreplacement.go @@ -0,0 +1,70 @@ +package randomreplacement + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/statsd_exporter/pkg/mapper_cache" +) + +type metricMapperRRCache struct { + lock sync.RWMutex + size int + items map[string]interface{} + metrics *mapper_cache.CacheMetrics +} + +func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*metricMapperRRCache, error) { + if size <= 0 { + return nil, nil + } + + metrics := mapper_cache.NewCacheMetrics(reg) + c := &metricMapperRRCache{ + items: make(map[string]interface{}, size+1), + size: size, + metrics: metrics, + } + return c, nil +} + +func (m *metricMapperRRCache) Get(metricKey string) (interface{}, bool) { + m.lock.RLock() + result, ok := m.items[metricKey] + m.lock.RUnlock() + + return result, ok +} + +func (m *metricMapperRRCache) Add(metricKey string, result interface{}) { + go m.trackCacheLength() + + m.lock.Lock() + + m.items[metricKey] = result + + // evict an item if needed + if len(m.items) > m.size { + for k := range m.items { + delete(m.items, k) + break + } + } + + m.lock.Unlock() +} + +func (m *metricMapperRRCache) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.items = make(map[string]interface{}, m.size+1) + m.metrics.CacheLength.Set(0) +} + +func (m *metricMapperRRCache) trackCacheLength() { + m.lock.RLock() + length := len(m.items) + m.lock.RUnlock() + m.metrics.CacheLength.Set(float64(length)) +} From de9a51c86383b0cee9cda209e83782b4e58a906f Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 22:59:41 -0500 Subject: [PATCH 2/7] fix TestMultipleMatches mapping indent Signed-off-by: glightfoot --- pkg/mapper/mapper_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 006b8e5..a9ac7e6 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -1573,7 +1573,7 @@ mappings: - match: aa.bb.*.* name: "aa_bb_${1}_total" labels: - app: "$2" + app: "$2" ` mapper := MetricMapper{} err := mapper.InitFromYAMLString(config) @@ -1620,5 +1620,4 @@ mappings: } } } - } From 32c612d5e805691df9e444f76ef45b616f4169fa Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 23:03:22 -0500 Subject: [PATCH 3/7] add license Signed-off-by: glightfoot --- pkg/mapper_cache/lru/lru.go | 13 +++++++++++++ pkg/mapper_cache/metrics.go | 13 +++++++++++++ .../randomreplacement/randomreplacement.go | 13 +++++++++++++ 3 files changed, 39 insertions(+) diff --git a/pkg/mapper_cache/lru/lru.go b/pkg/mapper_cache/lru/lru.go index dc8d9ac..e6739ee 100644 --- a/pkg/mapper_cache/lru/lru.go +++ b/pkg/mapper_cache/lru/lru.go @@ -1,3 +1,16 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package lru import ( diff --git a/pkg/mapper_cache/metrics.go b/pkg/mapper_cache/metrics.go index a6fa3f6..f3d76f7 100644 --- a/pkg/mapper_cache/metrics.go +++ b/pkg/mapper_cache/metrics.go @@ -1,3 +1,16 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package mapper_cache import "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/mapper_cache/randomreplacement/randomreplacement.go b/pkg/mapper_cache/randomreplacement/randomreplacement.go index 0bc470f..1dfaa28 100644 --- a/pkg/mapper_cache/randomreplacement/randomreplacement.go +++ b/pkg/mapper_cache/randomreplacement/randomreplacement.go @@ -1,3 +1,16 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package randomreplacement import ( From 940e653ea656796cf040fd60234b30c4ece5693f Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 23:21:49 -0500 Subject: [PATCH 4/7] cleanup Signed-off-by: glightfoot --- bridge_test.go | 2 +- main.go | 1 - pkg/event/event_test.go | 1 - pkg/exporter/exporter.go | 2 -- pkg/mapper/mapper.go | 2 -- pkg/mapper/mapper_cache.go | 4 +--- 6 files changed, 2 insertions(+), 10 deletions(-) diff --git a/bridge_test.go b/bridge_test.go index 57e53b9..ec3f676 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -572,7 +572,7 @@ func TestHandlePacket(t *testing.T) { le := len(events) // Flatten actual events. actual := event.Events{} - for i := 0; i < le; i++ { + for j := 0; j < le; j++ { actual = append(actual, <-events...) } diff --git a/main.go b/main.go index 5c08780..4dfa14b 100644 --- a/main.go +++ b/main.go @@ -492,7 +492,6 @@ func main() { } } } - } mux := http.NewServeMux() diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index d458ca5..192ce29 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -84,5 +84,4 @@ func TestEventIntervalFlush(t *testing.T) { if len(events) != 10 { t.Fatal("Expected 10 events in the event channel, but got", len(events)) } - } diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 0ee0b36..e5a628c 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -55,7 +55,6 @@ type Exporter struct { // Listen handles all events sent to the given channel sequentially. It // terminates when the channel is closed. func (b *Exporter) Listen(e <-chan event.Events) { - removeStaleMetricsTicker := clock.NewTicker(time.Second) for { @@ -77,7 +76,6 @@ func (b *Exporter) Listen(e <-chan event.Events) { // handleEvent processes a single Event according to the configured mapping. func (b *Exporter) handleEvent(thisEvent event.Event) { - mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType()) if mapping == nil { mapping = &mapper.MetricMapping{} diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 8666a6f..302a61f 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -144,7 +144,6 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { } currentMapping.labelFormatters = labelFormatters currentMapping.labelKeys = labelKeys - } else { if regex, err := regexp.Compile(currentMapping.Match); err != nil { return fmt.Errorf("invalid regex %s in mapping: %v", currentMapping.Match, err) @@ -222,7 +221,6 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { if currentMapping.Ttl == 0 && n.Defaults.Ttl > 0 { currentMapping.Ttl = n.Defaults.Ttl } - } m.mutex.Lock() diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 4f9ef2b..4d98e75 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -79,9 +79,7 @@ func (m *MetricMapperNoopCache) Get(metricKey string) (interface{}, bool) { return nil, false } -func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) { - return -} +func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) {} func (m *MetricMapperNoopCache) Reset() {} From aa529c88841c1784ad8534e45a51fc2e3307c3ad Mon Sep 17 00:00:00 2001 From: glightfoot Date: Sat, 6 Feb 2021 23:27:11 -0500 Subject: [PATCH 5/7] rename mapper_cache to mappercache Signed-off-by: glightfoot --- main.go | 4 ++-- pkg/mapper/mapper_benchmark_test.go | 4 ++-- pkg/mapper/mapper_test.go | 2 +- pkg/{mapper_cache => mappercache}/lru/lru.go | 6 +++--- pkg/{mapper_cache => mappercache}/metrics.go | 2 +- .../randomreplacement/randomreplacement.go | 6 +++--- 6 files changed, 12 insertions(+), 12 deletions(-) rename pkg/{mapper_cache => mappercache}/lru/lru.go (92%) rename pkg/{mapper_cache => mappercache}/metrics.go (98%) rename pkg/{mapper_cache => mappercache}/randomreplacement/randomreplacement.go (92%) diff --git a/main.go b/main.go index 4dfa14b..b6b0c8d 100644 --- a/main.go +++ b/main.go @@ -39,8 +39,8 @@ import ( "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" + "github.com/prometheus/statsd_exporter/pkg/mappercache/lru" + "github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement" ) const ( diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index 5219e1d..978a110 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -18,8 +18,8 @@ import ( "math/rand" "testing" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement" + "github.com/prometheus/statsd_exporter/pkg/mappercache/lru" + "github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement" ) var ( diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index a9ac7e6..8024b22 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -19,7 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru" + "github.com/prometheus/statsd_exporter/pkg/mappercache/lru" ) type mappings []struct { diff --git a/pkg/mapper_cache/lru/lru.go b/pkg/mappercache/lru/lru.go similarity index 92% rename from pkg/mapper_cache/lru/lru.go rename to pkg/mappercache/lru/lru.go index e6739ee..6eeeaef 100644 --- a/pkg/mapper_cache/lru/lru.go +++ b/pkg/mappercache/lru/lru.go @@ -18,12 +18,12 @@ import ( lru2 "github.com/hashicorp/golang-lru" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache" + "github.com/prometheus/statsd_exporter/pkg/mappercache" ) type metricMapperLRUCache struct { cache *lru2.Cache - metrics *mapper_cache.CacheMetrics + metrics *mappercache.CacheMetrics } func NewMetricMapperLRUCache(reg prometheus.Registerer, size int) (*metricMapperLRUCache, error) { @@ -31,7 +31,7 @@ func NewMetricMapperLRUCache(reg prometheus.Registerer, size int) (*metricMapper return nil, nil } - metrics := mapper_cache.NewCacheMetrics(reg) + metrics := mappercache.NewCacheMetrics(reg) cache, err := lru2.New(size) if err != nil { return &metricMapperLRUCache{}, err diff --git a/pkg/mapper_cache/metrics.go b/pkg/mappercache/metrics.go similarity index 98% rename from pkg/mapper_cache/metrics.go rename to pkg/mappercache/metrics.go index f3d76f7..9cc351e 100644 --- a/pkg/mapper_cache/metrics.go +++ b/pkg/mappercache/metrics.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package mapper_cache +package mappercache import "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/mapper_cache/randomreplacement/randomreplacement.go b/pkg/mappercache/randomreplacement/randomreplacement.go similarity index 92% rename from pkg/mapper_cache/randomreplacement/randomreplacement.go rename to pkg/mappercache/randomreplacement/randomreplacement.go index 1dfaa28..fe784de 100644 --- a/pkg/mapper_cache/randomreplacement/randomreplacement.go +++ b/pkg/mappercache/randomreplacement/randomreplacement.go @@ -18,14 +18,14 @@ import ( "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/mapper_cache" + "github.com/prometheus/statsd_exporter/pkg/mappercache" ) type metricMapperRRCache struct { lock sync.RWMutex size int items map[string]interface{} - metrics *mapper_cache.CacheMetrics + metrics *mappercache.CacheMetrics } func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*metricMapperRRCache, error) { @@ -33,7 +33,7 @@ func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*metricMapperR return nil, nil } - metrics := mapper_cache.NewCacheMetrics(reg) + metrics := mappercache.NewCacheMetrics(reg) c := &metricMapperRRCache{ items: make(map[string]interface{}, size+1), size: size, From 8b306c8c7669b0e08ba227e72ae64f025a8d7d76 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Fri, 19 Feb 2021 11:17:27 -0500 Subject: [PATCH 6/7] remove noop cache, add helper function for tests Signed-off-by: glightfoot --- main.go | 2 +- pkg/exporter/exporter_test.go | 2 - pkg/mapper/mapper.go | 44 +++++++++-------- pkg/mapper/mapper_benchmark_test.go | 73 +++-------------------------- pkg/mapper/mapper_cache.go | 14 ------ pkg/mapper/mapper_test.go | 43 ++++++++--------- 6 files changed, 55 insertions(+), 123 deletions(-) diff --git a/main.go b/main.go index b6b0c8d..66487a9 100644 --- a/main.go +++ b/main.go @@ -253,7 +253,7 @@ func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) var cache mapper.MetricMapperCache var err error if cacheSize == 0 { - cache = mapper.NewMetricMapperNoopCache() + return nil, nil } else { switch cacheType { case "lru": diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index bc17648..3948f76 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -182,7 +182,6 @@ func TestNegativeCounter(t *testing.T) { prev := getTelemetryCounterValue(errorCounter) testMapper := mapper.MetricMapper{} - testMapper.UseCache(mapper.NewMetricMapperNoopCache()) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) @@ -318,7 +317,6 @@ mappings: ` testMapper := &mapper.MetricMapper{} - testMapper.UseCache(mapper.NewMetricMapperNoopCache()) err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 302a61f..e610e1a 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -229,12 +229,10 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { m.Defaults = n.Defaults m.Mappings = n.Mappings - // If no cache has been configured, use a noop cache - if m.cache == nil { - m.cache = NewMetricMapperNoopCache() - } // Reset the cache since this function can be used to reload config - m.cache.Reset() + if m.cache != nil { + m.cache.Reset() + } if n.doFSM { var mappings []string @@ -266,6 +264,8 @@ func (m *MetricMapper) InitFromFile(fileName string) error { } func (m *MetricMapper) UseCache(cache MetricMapperCache) { + m.mutex.Lock() + defer m.mutex.Unlock() m.cache = cache } @@ -273,15 +273,13 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy m.mutex.RLock() defer m.mutex.RUnlock() - // default cache to noop cache if used from an uninitialized mapper - if m.cache == nil { - m.cache = NewMetricMapperNoopCache() - } - - result, cached := m.cache.Get(formatKey(statsdMetric, statsdMetricType)) - if cached { - r := result.(MetricMapperCacheResult) - return r.Mapping, r.Labels, r.Matched + // only use a cache if one is present + if m.cache != nil { + result, cached := m.cache.Get(formatKey(statsdMetric, statsdMetricType)) + if cached { + r := result.(MetricMapperCacheResult) + return r.Mapping, r.Labels, r.Matched + } } // glob matching @@ -303,13 +301,17 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy Labels: labels, } // add match to cache - m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) + if m.cache != nil { + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) + } return result, labels, true } else if !m.doRegex { // if there's no regex match type, return immediately - // Add miss cache - m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) + // Add miss to cache + if m.cache != nil { + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) + } return nil, nil, false } } @@ -348,13 +350,17 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy Labels: labels, } // Add Match to cache - m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) + if m.cache != nil { + m.cache.Add(formatKey(statsdMetric, statsdMetricType), r) + } return &mapping, labels, true } // Add Miss to cache - m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) + if m.cache != nil { + m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{}) + } return nil, nil, false } diff --git a/pkg/mapper/mapper_benchmark_test.go b/pkg/mapper/mapper_benchmark_test.go index 978a110..e8adeee 100644 --- a/pkg/mapper/mapper_benchmark_test.go +++ b/pkg/mapper/mapper_benchmark_test.go @@ -585,15 +585,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -700,15 +692,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -845,15 +829,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob) } for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -927,15 +903,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex) } for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -968,16 +936,7 @@ mappings:` + duplicateRules(101, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -1006,15 +965,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) mappings := duplicateMetrics(100, "metric100") for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) - } - mapper.UseCache(cache) + mapper := newTestMapperWithCache(cacheType, 1000) b.Run(cacheType, func(b *testing.B) { err := mapper.InitFromYAMLString(config) @@ -1050,18 +1001,8 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob) }) for _, cacheType := range []string{"lru", "random"} { - mapper := MetricMapper{} - var cache MetricMapperCache - switch cacheType { - case "lru": - cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50) - case "random": - cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50) - } - mapper.UseCache(cache) - + mapper := newTestMapperWithCache(cacheType, 50) b.Run(cacheType, func(b *testing.B) { - mapper := MetricMapper{} err := mapper.InitFromYAMLString(config) if err != nil { b.Fatalf("Config load error: %s %s", config, err) diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 4d98e75..2c37fa3 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -69,20 +69,6 @@ type MetricMapperCache interface { Reset() } -type MetricMapperNoopCache struct{} - -func NewMetricMapperNoopCache() *MetricMapperNoopCache { - return &MetricMapperNoopCache{} -} - -func (m *MetricMapperNoopCache) Get(metricKey string) (interface{}, bool) { - return nil, false -} - -func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) {} - -func (m *MetricMapperNoopCache) Reset() {} - func formatKey(metricString string, metricType MetricType) string { return string(metricType) + "." + metricString } diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 8024b22..3aa6d51 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mappercache/lru" + "github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement" ) type mappings []struct { @@ -36,6 +37,21 @@ type mappings []struct { buckets []float64 } +func newTestMapperWithCache(cacheType string, size int) *MetricMapper { + mapper := MetricMapper{} + var cache MetricMapperCache + switch cacheType { + case "lru": + cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, size) + case "random": + cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, size) + case "none": + return &mapper + } + mapper.UseCache(cache) + return &mapper +} + func TestMetricMapperYAML(t *testing.T) { scenarios := []struct { testName string @@ -1575,11 +1591,6 @@ mappings: labels: app: "$2" ` - mapper := MetricMapper{} - err := mapper.InitFromYAMLString(config) - if err != nil { - t.Fatalf("config load error: %s ", err) - } names := map[string]string{ "aa.bb.aa.myapp": "aa_bb_aa_total", @@ -1588,24 +1599,14 @@ mappings: "aa.bb.dd.myapp": "aa_bb_dd_total", } - lruCache, err := lru.NewMetricMapperLRUCache(mapper.Registerer, len(names)) - if err != nil { - t.Fatalf(err.Error()) - } - - scenarios := []struct { - cache MetricMapperCache - }{ - { - cache: NewMetricMapperNoopCache(), - }, - { - cache: lruCache, - }, - } + scenarios := []string{"none", "lru"} for i, scenario := range scenarios { - mapper.UseCache(scenario.cache) + mapper := newTestMapperWithCache(scenario, 1000) + err := mapper.InitFromYAMLString(config) + if err != nil { + t.Fatalf("config load error: %s ", err) + } // run multiple times to ensure cache works as expected for j := 0; j < 10; j++ { From a197834f64385ea836cbac56761913b01da72f6c Mon Sep 17 00:00:00 2001 From: glightfoot Date: Fri, 19 Feb 2021 14:13:03 -0500 Subject: [PATCH 7/7] Add comments about thread-safety Signed-off-by: glightfoot --- pkg/mapper/mapper.go | 2 ++ pkg/mapper/mapper_cache.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index e610e1a..33ea5c3 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -263,6 +263,8 @@ func (m *MetricMapper) InitFromFile(fileName string) error { return m.InitFromYAMLString(string(mappingStr)) } +// UseCache tells the mapper to use a cache that implements the MetricMapperCache interface. +// This cache MUST be thread-safe! func (m *MetricMapper) UseCache(cache MetricMapperCache) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/mapper/mapper_cache.go b/pkg/mapper/mapper_cache.go index 2c37fa3..9d65f8c 100644 --- a/pkg/mapper/mapper_cache.go +++ b/pkg/mapper/mapper_cache.go @@ -59,7 +59,7 @@ type MetricMapperCacheResult struct { Labels prometheus.Labels } -// MetricMapperCache must be thread-safe and should be instrumented with CacheMetrics +// MetricMapperCache MUST be thread-safe and should be instrumented with CacheMetrics type MetricMapperCache interface { // Get a cached result Get(metricKey string) (interface{}, bool)