Merge pull request #347 from grafana/custom-registry

Pass around custom registry for registering exporter metrics
This commit is contained in:
Matthias Rampke 2020-11-24 09:18:10 +01:00 committed by GitHub
commit 8772c03c0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 65 deletions

View file

@ -657,7 +657,7 @@ mappings:
events := make(chan event.Events) events := make(chan event.Events)
defer close(events) defer close(events)
go func() { go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
}() }()

View file

@ -19,6 +19,7 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/line"
@ -171,7 +172,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err) b.Fatalf("Config load error: %s %s", config, err)
} }
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
// reset benchmark timer to not measure startup costs // reset benchmark timer to not measure startup costs
b.ResetTimer() b.ResetTimer()

View file

@ -438,7 +438,7 @@ func main() {
} }
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
if *mappingConfig != "" { if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil { if err != nil {
@ -458,7 +458,7 @@ func main() {
mapper.InitCache(*cacheSize, cacheOption) mapper.InitCache(*cacheSize, cacheOption)
} }
exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
if *checkConfig { if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting") level.Info(logger).Log("msg", "Configuration check successful, exiting")

View file

@ -189,10 +189,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
} }
} }
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter { func NewExporter(reg prometheus.Registerer, mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{ return &Exporter{
Mapper: mapper, Mapper: mapper,
Registry: registry.NewRegistry(mapper), Registry: registry.NewRegistry(reg, mapper),
Logger: logger, Logger: logger,
EventsActions: eventsActions, EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped, EventsUnmapped: eventsUnmapped,

View file

@ -184,7 +184,7 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter) updated := getTelemetryCounterValue(errorCounter)
@ -265,7 +265,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err) t.Fatalf("Config load error: %s %s", config, err)
} }
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
@ -323,7 +323,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err) t.Fatalf("Config load error: %s %s", config, err)
} }
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
@ -538,7 +538,7 @@ mappings:
events <- s.in events <- s.in
close(events) close(events)
}() }()
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
@ -593,7 +593,7 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name") errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter) prev := getTelemetryCounterValue(errorCounter)
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter) updated := getTelemetryCounterValue(errorCounter)
@ -660,7 +660,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
} }
@ -674,7 +674,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
}() }()
@ -718,7 +718,7 @@ func TestHistogramUnits(t *testing.T) {
go func() { go func() {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events) ex.Listen(events)
}() }()
@ -755,7 +755,7 @@ func TestCounterIncrement(t *testing.T) {
go func() { go func() {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
}() }()
@ -864,7 +864,7 @@ mappings:
events := make(chan event.Events) events := make(chan event.Events)
defer close(events) defer close(events)
go func() { go func() {
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events) ex.Listen(events)
}() }()
@ -952,7 +952,7 @@ mappings:
} }
func TestHashLabelNames(t *testing.T) { func TestHashLabelNames(t *testing.T) {
r := registry.NewRegistry(nil) r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
// Validate value hash changes and name has doesn't when just the value changes. // Validate value hash changes and name has doesn't when just the value changes.
hash1, _ := r.HashLabels(map[string]string{ hash1, _ := r.HashLabels(map[string]string{
"label": "value1", "label": "value1",
@ -1113,7 +1113,7 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
}, },
} }
r := registry.NewRegistry(nil) r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
for _, s := range scenarios { for _, s := range scenarios {
b.Run(s.name, func(b *testing.B) { b.Run(s.name, func(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {

View file

@ -36,6 +36,7 @@ var (
) )
type MetricMapper struct { type MetricMapper struct {
Registerer prometheus.Registerer
Defaults mapperConfigDefaults `yaml:"defaults"` Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"` Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM FSM *fsm.FSM
@ -252,7 +253,7 @@ func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...C
func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) { func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 { if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache() m.cache = NewMetricMapperNoopCache(m.Registerer)
} else { } else {
o := cacheOptions{ o := cacheOptions{
cacheType: "lru", cacheType: "lru",
@ -267,9 +268,9 @@ func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
) )
switch o.cacheType { switch o.cacheType {
case "lru": case "lru":
cache, err = NewMetricMapperCache(cacheSize) cache, err = NewMetricMapperCache(m.Registerer, cacheSize)
case "random": case "random":
cache, err = NewMetricMapperRRCache(cacheSize) cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize)
default: default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType) err = fmt.Errorf("unsupported cache type %q", o.cacheType)
} }

View file

@ -20,26 +20,41 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var ( type CacheMetrics struct {
cacheLength = prometheus.NewGauge( CacheLength prometheus.Gauge
CacheGetsTotal prometheus.Counter
CacheHitsTotal prometheus.Counter
}
func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
var m CacheMetrics
m.CacheLength = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "statsd_metric_mapper_cache_length", Name: "statsd_metric_mapper_cache_length",
Help: "The count of unique metrics currently cached.", Help: "The count of unique metrics currently cached.",
}, },
) )
cacheGetsTotal = prometheus.NewCounter( m.CacheGetsTotal = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_gets_total", Name: "statsd_metric_mapper_cache_gets_total",
Help: "The count of total metric cache gets.", Help: "The count of total metric cache gets.",
}, },
) )
cacheHitsTotal = prometheus.NewCounter( m.CacheHitsTotal = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_hits_total", Name: "statsd_metric_mapper_cache_hits_total",
Help: "The count of total metric cache hits.", Help: "The count of total metric cache hits.",
}, },
) )
)
if reg != nil {
reg.MustRegister(m.CacheLength)
reg.MustRegister(m.CacheGetsTotal)
reg.MustRegister(m.CacheHitsTotal)
}
return &m
}
type cacheOptions struct { type cacheOptions struct {
cacheType string cacheType string
@ -68,25 +83,27 @@ type MetricMapperCache interface {
type MetricMapperLRUCache struct { type MetricMapperLRUCache struct {
MetricMapperCache MetricMapperCache
cache *lru.Cache cache *lru.Cache
metrics *CacheMetrics
} }
type MetricMapperNoopCache struct { type MetricMapperNoopCache struct {
MetricMapperCache MetricMapperCache
metrics *CacheMetrics
} }
func NewMetricMapperCache(size int) (*MetricMapperLRUCache, error) { func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) {
cacheLength.Set(0) metrics := NewCacheMetrics(reg)
cache, err := lru.New(size) cache, err := lru.New(size)
if err != nil { if err != nil {
return &MetricMapperLRUCache{}, err return &MetricMapperLRUCache{}, err
} }
return &MetricMapperLRUCache{cache: cache}, nil return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil
} }
func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
cacheGetsTotal.Inc() m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok { if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok {
cacheHitsTotal.Inc() m.metrics.CacheHitsTotal.Inc()
return result.(*MetricMapperCacheResult), true return result.(*MetricMapperCacheResult), true
} else { } else {
return nil, false return nil, false
@ -104,16 +121,15 @@ func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricTyp
} }
func (m *MetricMapperLRUCache) trackCacheLength() { func (m *MetricMapperLRUCache) trackCacheLength() {
cacheLength.Set(float64(m.cache.Len())) m.metrics.CacheLength.Set(float64(m.cache.Len()))
} }
func formatKey(metricString string, metricType MetricType) string { func formatKey(metricString string, metricType MetricType) string {
return string(metricType) + "." + metricString return string(metricType) + "." + metricString
} }
func NewMetricMapperNoopCache() *MetricMapperNoopCache { func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache {
cacheLength.Set(0) return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)}
return &MetricMapperNoopCache{}
} }
func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) { func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
@ -133,13 +149,15 @@ type MetricMapperRRCache struct {
lock sync.RWMutex lock sync.RWMutex
size int size int
items map[string]*MetricMapperCacheResult items map[string]*MetricMapperCacheResult
metrics *CacheMetrics
} }
func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) { func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) {
cacheLength.Set(0) metrics := NewCacheMetrics(reg)
c := &MetricMapperRRCache{ c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1), items: make(map[string]*MetricMapperCacheResult, size+1),
size: size, size: size,
metrics: metrics,
} }
return c, nil return c, nil
} }
@ -188,11 +206,5 @@ func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock() m.lock.RLock()
length := len(m.items) length := len(m.items)
m.lock.RUnlock() m.lock.RUnlock()
cacheLength.Set(float64(length)) m.metrics.CacheLength.Set(float64(length))
}
func init() {
prometheus.MustRegister(cacheLength)
prometheus.MustRegister(cacheGetsTotal)
prometheus.MustRegister(cacheHitsTotal)
} }

View file

@ -40,6 +40,7 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
} }
type Registry struct { type Registry struct {
Registerer prometheus.Registerer
Metrics map[string]metrics.Metric Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper Mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct // The below value and label variables are allocated in the registry struct
@ -49,8 +50,9 @@ type Registry struct {
Hasher hash.Hash64 Hasher hash.Hash64
} }
func NewRegistry(mapper *mapper.MetricMapper) *Registry { func NewRegistry(reg prometheus.Registerer, mapper *mapper.MetricMapper) *Registry {
return &Registry{ return &Registry{
Registerer: reg,
Metrics: make(map[string]metrics.Metric), Metrics: make(map[string]metrics.Metric),
Mapper: mapper, Mapper: mapper,
Hasher: fnv.New64a(), Hasher: fnv.New64a(),
@ -170,7 +172,7 @@ func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help
Help: help, Help: help,
}, labelNames) }, labelNames)
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { if err := r.Registerer.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err return nil, err
} }
} else { } else {
@ -206,7 +208,7 @@ func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help st
Help: help, Help: help,
}, labelNames) }, labelNames)
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { if err := r.Registerer.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err return nil, err
} }
} else { } else {