move lru and rr mapper caches to their own package and make mapper_cache a better an interface for implementing externally`

Signed-off-by: glightfoot <glightfoot@rsglab.com>
This commit is contained in:
glightfoot 2021-02-06 19:22:29 -05:00
parent fbcadbf71b
commit 0099df7f71
11 changed files with 403 additions and 246 deletions

View file

@ -650,7 +650,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}

View file

@ -20,6 +20,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
@ -167,7 +168,7 @@ mappings:
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}

57
main.go
View file

@ -39,6 +39,8 @@ import (
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement"
)
const (
@ -206,7 +208,7 @@ func serveHTTP(mux http.Handler, listenAddress string, logger log.Logger) {
os.Exit(1)
}
func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, logger log.Logger) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP)
@ -218,12 +220,12 @@ func sighupConfigReloader(fileName string, mapper *mapper.MetricMapper, cacheSiz
level.Info(logger).Log("msg", "Received signal, attempting reload", "signal", s)
reloadConfig(fileName, mapper, cacheSize, logger, option)
reloadConfig(fileName, mapper, logger)
}
}
func reloadConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
err := mapper.InitFromFile(fileName, cacheSize, option)
func reloadConfig(fileName string, mapper *mapper.MetricMapper, logger log.Logger) {
err := mapper.InitFromFile(fileName)
if err != nil {
level.Info(logger).Log("msg", "Error reloading config", "error", err)
configLoads.WithLabelValues("failure").Inc()
@ -247,6 +249,29 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger
return nil
}
func getCache(cacheSize int, cacheType string, registerer prometheus.Registerer) (mapper.MetricMapperCache, error) {
var cache mapper.MetricMapperCache
var err error
if cacheSize == 0 {
cache = mapper.NewMetricMapperNoopCache()
} else {
switch cacheType {
case "lru":
cache, err = lru.NewMetricMapperLRUCache(registerer, cacheSize)
case "random":
cache, err = randomreplacement.NewMetricMapperRRCache(registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", cacheType)
}
if err != nil {
return nil, err
}
}
return cache, nil
}
func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
@ -293,8 +318,6 @@ func main() {
parser.EnableSignalFXParsing()
}
cacheOption := mapper.WithCacheType(*cacheType)
level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "context", version.BuildContext())
@ -302,15 +325,23 @@ func main() {
defer close(events)
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)
mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
thisMapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
cache, err := getCache(*cacheSize, *cacheType, thisMapper.Registerer)
if err != nil {
level.Error(logger).Log("msg", "Unable to setup metric mapper cache", "error", err)
os.Exit(1)
}
thisMapper.UseCache(cache)
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
err := thisMapper.InitFromFile(*mappingConfig)
if err != nil {
level.Error(logger).Log("msg", "error loading config", "error", err)
os.Exit(1)
}
if *dumpFSMPath != "" {
err := dumpFSM(mapper, *dumpFSMPath, logger)
err := dumpFSM(thisMapper, *dumpFSMPath, logger)
if err != nil {
level.Error(logger).Log("msg", "error dumping FSM", "error", err)
// Failure to dump the FSM is an error (the user asked for it and it
@ -318,11 +349,9 @@ func main() {
// afterwards).
}
}
} else {
mapper.InitCache(*cacheSize, cacheOption)
}
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")
@ -489,7 +518,7 @@ func main() {
return
}
level.Info(logger).Log("msg", "Received lifecycle api reload, attempting reload")
reloadConfig(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
reloadConfig(*mappingConfig, thisMapper, logger)
}
})
mux.HandleFunc("/-/quit", func(w http.ResponseWriter, r *http.Request) {
@ -518,7 +547,7 @@ func main() {
go serveHTTP(mux, *listenAddress, logger)
go sighupConfigReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
go sighupConfigReloader(*mappingConfig, thisMapper, logger)
go exporter.Listen(events)
signals := make(chan os.Signal, 1)

View file

@ -182,7 +182,7 @@ func TestNegativeCounter(t *testing.T) {
prev := getTelemetryCounterValue(errorCounter)
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
testMapper.UseCache(mapper.NewMetricMapperNoopCache())
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
@ -260,7 +260,7 @@ mappings:
name: "histogram_test"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
@ -318,7 +318,8 @@ mappings:
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
testMapper.UseCache(mapper.NewMetricMapperNoopCache())
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
@ -528,7 +529,7 @@ mappings:
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
@ -585,7 +586,7 @@ mappings:
name: "${1}"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
@ -658,7 +659,6 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
}()
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
@ -672,7 +672,6 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
@ -717,7 +716,6 @@ func TestHistogramUnits(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events)
@ -754,7 +752,6 @@ func TestCounterIncrement(t *testing.T) {
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
@ -857,7 +854,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}

View file

@ -71,7 +71,7 @@ var defaultQuantiles = []metricObjective{
{Quantile: 0.99, Error: 0.001},
}
func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, options ...CacheOption) error {
func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
var n MetricMapper
if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil {
@ -230,7 +230,13 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op
m.Defaults = n.Defaults
m.Mappings = n.Mappings
m.InitCache(cacheSize, options...)
// If no cache has been configured, use a noop cache
if m.cache == nil {
m.cache = NewMetricMapperNoopCache()
}
// Reset the cache since this function can be used to reload config
m.cache.Reset()
if n.doFSM {
var mappings []string
@ -252,53 +258,34 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int, op
return nil
}
func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...CacheOption) error {
func (m *MetricMapper) InitFromFile(fileName string) error {
mappingStr, err := ioutil.ReadFile(fileName)
if err != nil {
return err
}
return m.InitFromYAMLString(string(mappingStr), cacheSize, options...)
return m.InitFromYAMLString(string(mappingStr))
}
func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache(m.Registerer)
} else {
o := cacheOptions{
cacheType: "lru",
}
for _, f := range options {
f(&o)
}
var (
cache MetricMapperCache
err error
)
switch o.cacheType {
case "lru":
cache, err = NewMetricMapperCache(m.Registerer, cacheSize)
case "random":
cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType)
}
if err != nil {
log.Fatalf("Unable to setup metric cache. Caused by: %s", err)
}
func (m *MetricMapper) UseCache(cache MetricMapperCache) {
m.cache = cache
}
}
func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
result, cached := m.cache.Get(statsdMetric, statsdMetricType)
if cached {
return result.Mapping, result.Labels, result.Matched
// default cache to noop cache if used from an uninitialized mapper
if m.cache == nil {
m.cache = NewMetricMapperNoopCache()
}
result, cached := m.cache.Get(formatKey(statsdMetric, statsdMetricType))
if cached {
r := result.(MetricMapperCacheResult)
return r.Mapping, r.Labels, r.Matched
}
// glob matching
if m.doFSM {
finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType))
@ -312,12 +299,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
labels[result.labelKeys[index]] = formatter.Format(captures)
}
m.cache.AddMatch(statsdMetric, statsdMetricType, result, labels)
r := MetricMapperCacheResult{
Mapping: result,
Matched: true,
Labels: labels,
}
// add match to cache
m.cache.Add(formatKey(statsdMetric, statsdMetricType), r)
return result, labels, true
} else if !m.doRegex {
// if there's no regex match type, return immediately
m.cache.AddMiss(statsdMetric, statsdMetricType)
// Add miss cache
m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{})
return nil, nil, false
}
}
@ -350,12 +344,19 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
labels[label] = string(value)
}
m.cache.AddMatch(statsdMetric, statsdMetricType, &mapping, labels)
r := MetricMapperCacheResult{
Mapping: &mapping,
Matched: true,
Labels: labels,
}
// Add Match to cache
m.cache.Add(formatKey(statsdMetric, statsdMetricType), r)
return &mapping, labels, true
}
m.cache.AddMiss(statsdMetric, statsdMetricType)
// Add Miss to cache
m.cache.Add(formatKey(statsdMetric, statsdMetricType), MetricMapperCacheResult{})
return nil, nil, false
}

View file

@ -17,6 +17,9 @@ import (
"fmt"
"math/rand"
"testing"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache/randomreplacement"
)
var (
@ -105,7 +108,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -169,7 +172,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -240,7 +243,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -304,7 +307,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -331,7 +334,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -359,7 +362,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -385,7 +388,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -412,7 +415,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -438,7 +441,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -465,7 +468,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -502,7 +505,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -540,7 +543,7 @@ mappings:
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -561,7 +564,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -582,9 +585,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -609,7 +621,7 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -632,9 +644,19 @@ mappings:` + duplicateRules(10, ruleTemplateSingleMatchRegex)
}
for _, cacheType := range []string{"lru", "random"} {
mapper := MetricMapper{}
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -657,7 +679,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -678,9 +700,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -703,7 +734,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -726,7 +757,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -749,7 +780,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -772,7 +803,7 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchRegex)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -793,7 +824,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -814,9 +845,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchGlob)
}
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -841,7 +881,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -864,7 +904,7 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex)
}
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -887,9 +927,18 @@ mappings:` + duplicateRules(100, ruleTemplateMultipleMatchRegex)
}
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -914,14 +963,24 @@ func duplicateMetrics(count int, template string) []string {
func BenchmarkGlob100RulesCached100Metrics(b *testing.B) {
config := `---
mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
mappings:` + duplicateRules(101, ruleTemplateSingleMatchGlob)
mappings := duplicateMetrics(100, "metric100")
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 1000, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 1000)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -947,9 +1006,18 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
mappings := duplicateMetrics(100, "metric100")
for _, cacheType := range []string{"lru", "random"} {
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType))
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}
@ -982,9 +1050,19 @@ mappings:` + duplicateRules(100, ruleTemplateSingleMatchGlob)
})
for _, cacheType := range []string{"lru", "random"} {
mapper := MetricMapper{}
var cache MetricMapperCache
switch cacheType {
case "lru":
cache, _ = lru.NewMetricMapperLRUCache(mapper.Registerer, 50)
case "random":
cache, _ = randomreplacement.NewMetricMapperRRCache(mapper.Registerer, 50)
}
mapper.UseCache(cache)
b.Run(cacheType, func(b *testing.B) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 50, WithCacheType(cacheType))
err := mapper.InitFromYAMLString(config)
if err != nil {
b.Fatalf("Config load error: %s %s", config, err)
}

View file

@ -14,9 +14,6 @@
package mapper
import (
"sync"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
)
@ -56,155 +53,38 @@ func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
return &m
}
type cacheOptions struct {
cacheType string
}
type CacheOption func(*cacheOptions)
func WithCacheType(cacheType string) CacheOption {
return func(o *cacheOptions) {
o.cacheType = cacheType
}
}
type MetricMapperCacheResult struct {
Mapping *MetricMapping
Matched bool
Labels prometheus.Labels
}
// MetricMapperCache must be thread-safe and should be instrumented with CacheMetrics
type MetricMapperCache interface {
Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool)
AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels)
AddMiss(metricString string, metricType MetricType)
// Get a cached result
Get(metricKey string) (interface{}, bool)
// Add a statsd MetricMapperResult to the cache
Add(metricKey string, result interface{}) // Add an item to the cache
// Reset clears the cache for config reloads
Reset()
}
type MetricMapperLRUCache struct {
MetricMapperCache
cache *lru.Cache
metrics *CacheMetrics
type MetricMapperNoopCache struct{}
func NewMetricMapperNoopCache() *MetricMapperNoopCache {
return &MetricMapperNoopCache{}
}
type MetricMapperNoopCache struct {
MetricMapperCache
metrics *CacheMetrics
}
func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) {
metrics := NewCacheMetrics(reg)
cache, err := lru.New(size)
if err != nil {
return &MetricMapperLRUCache{}, err
}
return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil
}
func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok {
m.metrics.CacheHitsTotal.Inc()
return result.(*MetricMapperCacheResult), true
} else {
func (m *MetricMapperNoopCache) Get(metricKey string) (interface{}, bool) {
return nil, false
}
func (m *MetricMapperNoopCache) Add(metricKey string, result interface{}) {
return
}
func (m *MetricMapperLRUCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) {
go m.trackCacheLength()
m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels})
}
func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricType) {
go m.trackCacheLength()
m.cache.Add(formatKey(metricString, metricType), &MetricMapperCacheResult{Matched: false})
}
func (m *MetricMapperLRUCache) trackCacheLength() {
m.metrics.CacheLength.Set(float64(m.cache.Len()))
}
func (m *MetricMapperNoopCache) Reset() {}
func formatKey(metricString string, metricType MetricType) string {
return string(metricType) + "." + metricString
}
func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache {
return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)}
}
func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
return nil, false
}
func (m *MetricMapperNoopCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) {
return
}
func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricType) {
return
}
type MetricMapperRRCache struct {
MetricMapperCache
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
metrics *CacheMetrics
}
func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) {
metrics := NewCacheMetrics(reg)
c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
metrics: metrics,
}
return c, nil
}
func (m *MetricMapperRRCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
key := formatKey(metricString, metricType)
m.lock.RLock()
result, ok := m.items[key]
m.lock.RUnlock()
return result, ok
}
func (m *MetricMapperRRCache) addItem(metricString string, metricType MetricType, result *MetricMapperCacheResult) {
go m.trackCacheLength()
key := formatKey(metricString, metricType)
m.lock.Lock()
m.items[key] = result
// evict an item if needed
if len(m.items) > m.size {
for k := range m.items {
delete(m.items, k)
break
}
}
m.lock.Unlock()
}
func (m *MetricMapperRRCache) AddMatch(metricString string, metricType MetricType, mapping *MetricMapping, labels prometheus.Labels) {
e := &MetricMapperCacheResult{Mapping: mapping, Matched: true, Labels: labels}
m.addItem(metricString, metricType, e)
}
func (m *MetricMapperRRCache) AddMiss(metricString string, metricType MetricType) {
e := &MetricMapperCacheResult{Matched: false}
m.addItem(metricString, metricType, e)
}
func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
m.metrics.CacheLength.Set(float64(length))
}

View file

@ -18,6 +18,8 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache/lru"
)
type mappings []struct {
@ -1379,12 +1381,15 @@ mappings:
}
mapper := MetricMapper{}
cache, _ := lru.NewMetricMapperLRUCache(mapper.Registerer, 1000)
mapper.UseCache(cache)
for i, scenario := range scenarios {
if scenario.testName == "" {
t.Fatalf("Missing testName in scenario %+v", scenario)
}
t.Run(scenario.testName, func(t *testing.T) {
err := mapper.InitFromYAMLString(scenario.config, 1000)
err := mapper.InitFromYAMLString(scenario.config)
if err != nil && !scenario.configBad {
t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err)
}
@ -1542,7 +1547,7 @@ mappings:
}
t.Run(scenario.testName, func(t *testing.T) {
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(scenario.config, 0)
err := mapper.InitFromYAMLString(scenario.config)
if err != nil && !scenario.configBad {
t.Fatalf("%d. Config load error: %s %s", i, scenario.config, err)
}
@ -1571,7 +1576,7 @@ mappings:
app: "$2"
`
mapper := MetricMapper{}
err := mapper.InitFromYAMLString(config, 0)
err := mapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("config load error: %s ", err)
}
@ -1583,19 +1588,24 @@ mappings:
"aa.bb.dd.myapp": "aa_bb_dd_total",
}
lruCache, err := lru.NewMetricMapperLRUCache(mapper.Registerer, len(names))
if err != nil {
t.Fatalf(err.Error())
}
scenarios := []struct {
cacheSize int
cache MetricMapperCache
}{
{
cacheSize: 0,
cache: NewMetricMapperNoopCache(),
},
{
cacheSize: len(names),
cache: lruCache,
},
}
for i, scenario := range scenarios {
mapper.InitCache(scenario.cacheSize)
mapper.UseCache(scenario.cache)
// run multiple times to ensure cache works as expected
for j := 0; j < 10; j++ {

View file

@ -0,0 +1,52 @@
package lru
import (
"github.com/prometheus/client_golang/prometheus"
lru2 "github.com/hashicorp/golang-lru"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache"
)
type metricMapperLRUCache struct {
cache *lru2.Cache
metrics *mapper_cache.CacheMetrics
}
func NewMetricMapperLRUCache(reg prometheus.Registerer, size int) (*metricMapperLRUCache, error) {
if size <= 0 {
return nil, nil
}
metrics := mapper_cache.NewCacheMetrics(reg)
cache, err := lru2.New(size)
if err != nil {
return &metricMapperLRUCache{}, err
}
return &metricMapperLRUCache{metrics: metrics, cache: cache}, nil
}
func (m *metricMapperLRUCache) Get(metricKey string) (interface{}, bool) {
m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(metricKey); ok {
m.metrics.CacheHitsTotal.Inc()
return result, true
} else {
return nil, false
}
}
func (m *metricMapperLRUCache) Add(metricKey string, result interface{}) {
go m.trackCacheLength()
m.cache.Add(metricKey, result)
}
func (m *metricMapperLRUCache) trackCacheLength() {
m.metrics.CacheLength.Set(float64(m.cache.Len()))
}
func (m *metricMapperLRUCache) Reset() {
m.cache.Purge()
m.metrics.CacheLength.Set(0)
}

View file

@ -0,0 +1,39 @@
package mapper_cache
import "github.com/prometheus/client_golang/prometheus"
type CacheMetrics struct {
CacheLength prometheus.Gauge
CacheGetsTotal prometheus.Counter
CacheHitsTotal prometheus.Counter
}
func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
var m CacheMetrics
m.CacheLength = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "statsd_metric_mapper_cache_length",
Help: "The count of unique metrics currently cached.",
},
)
m.CacheGetsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_gets_total",
Help: "The count of total metric cache gets.",
},
)
m.CacheHitsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_hits_total",
Help: "The count of total metric cache hits.",
},
)
if reg != nil {
reg.MustRegister(m.CacheLength)
reg.MustRegister(m.CacheGetsTotal)
reg.MustRegister(m.CacheHitsTotal)
}
return &m
}

View file

@ -0,0 +1,70 @@
package randomreplacement
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/mapper_cache"
)
type metricMapperRRCache struct {
lock sync.RWMutex
size int
items map[string]interface{}
metrics *mapper_cache.CacheMetrics
}
func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*metricMapperRRCache, error) {
if size <= 0 {
return nil, nil
}
metrics := mapper_cache.NewCacheMetrics(reg)
c := &metricMapperRRCache{
items: make(map[string]interface{}, size+1),
size: size,
metrics: metrics,
}
return c, nil
}
func (m *metricMapperRRCache) Get(metricKey string) (interface{}, bool) {
m.lock.RLock()
result, ok := m.items[metricKey]
m.lock.RUnlock()
return result, ok
}
func (m *metricMapperRRCache) Add(metricKey string, result interface{}) {
go m.trackCacheLength()
m.lock.Lock()
m.items[metricKey] = result
// evict an item if needed
if len(m.items) > m.size {
for k := range m.items {
delete(m.items, k)
break
}
}
m.lock.Unlock()
}
func (m *metricMapperRRCache) Reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.items = make(map[string]interface{}, m.size+1)
m.metrics.CacheLength.Set(0)
}
func (m *metricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
m.metrics.CacheLength.Set(float64(length))
}