Pass around custom registry for registering exporter metrics

Importers of pkg/exporter may not want for series to be imported into
the default registry. This commit makes it possible to provide a custom
registry for metrics registration.

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
Robert Fratto 2020-11-20 14:14:14 -05:00
parent dcd95d01df
commit b5deeda251
8 changed files with 81 additions and 65 deletions

View file

@ -657,7 +657,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

View file

@ -19,6 +19,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
@ -171,7 +172,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err)
}
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
// reset benchmark timer to not measure startup costs
b.ResetTimer()

View file

@ -438,7 +438,7 @@ func main() {
}
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil {
@ -458,7 +458,7 @@ func main() {
mapper.InitCache(*cacheSize, cacheOption)
}
exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")

View file

@ -189,10 +189,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
}
}
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
func NewExporter(reg prometheus.Registerer, mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{
Mapper: mapper,
Registry: registry.NewRegistry(mapper),
Registry: registry.NewRegistry(reg, mapper),
Logger: logger,
EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped,

View file

@ -184,7 +184,7 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter)
@ -265,7 +265,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -323,7 +323,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -538,7 +538,7 @@ mappings:
events <- s.in
close(events)
}()
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -593,7 +593,7 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter)
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter)
@ -660,7 +660,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}
@ -674,7 +674,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
@ -718,7 +718,7 @@ func TestHistogramUnits(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events)
}()
@ -755,7 +755,7 @@ func TestCounterIncrement(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
@ -864,7 +864,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
@ -952,7 +952,7 @@ mappings:
}
func TestHashLabelNames(t *testing.T) {
r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
// Validate value hash changes and name has doesn't when just the value changes.
hash1, _ := r.HashLabels(map[string]string{
"label": "value1",
@ -1113,7 +1113,7 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
},
}
r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
for _, s := range scenarios {
b.Run(s.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {

View file

@ -36,13 +36,14 @@ var (
)
type MetricMapper struct {
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex
Registerer prometheus.Registerer
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex
MappingsCount prometheus.Gauge
}
@ -252,7 +253,7 @@ func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...C
func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache()
m.cache = NewMetricMapperNoopCache(m.Registerer)
} else {
o := cacheOptions{
cacheType: "lru",
@ -267,9 +268,9 @@ func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
)
switch o.cacheType {
case "lru":
cache, err = NewMetricMapperCache(cacheSize)
cache, err = NewMetricMapperCache(m.Registerer, cacheSize)
case "random":
cache, err = NewMetricMapperRRCache(cacheSize)
cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType)
}

View file

@ -20,26 +20,41 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
var (
cacheLength = prometheus.NewGauge(
type CacheMetrics struct {
CacheLength prometheus.Gauge
CacheGetsTotal prometheus.Counter
CacheHitsTotal prometheus.Counter
}
func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
var m CacheMetrics
m.CacheLength = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "statsd_metric_mapper_cache_length",
Help: "The count of unique metrics currently cached.",
},
)
cacheGetsTotal = prometheus.NewCounter(
m.CacheGetsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_gets_total",
Help: "The count of total metric cache gets.",
},
)
cacheHitsTotal = prometheus.NewCounter(
m.CacheHitsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_hits_total",
Help: "The count of total metric cache hits.",
},
)
)
if reg != nil {
reg.MustRegister(m.CacheLength)
reg.MustRegister(m.CacheGetsTotal)
reg.MustRegister(m.CacheHitsTotal)
}
return &m
}
type cacheOptions struct {
cacheType string
@ -67,26 +82,28 @@ type MetricMapperCache interface {
type MetricMapperLRUCache struct {
MetricMapperCache
cache *lru.Cache
cache *lru.Cache
metrics *CacheMetrics
}
type MetricMapperNoopCache struct {
MetricMapperCache
metrics *CacheMetrics
}
func NewMetricMapperCache(size int) (*MetricMapperLRUCache, error) {
cacheLength.Set(0)
func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) {
metrics := NewCacheMetrics(reg)
cache, err := lru.New(size)
if err != nil {
return &MetricMapperLRUCache{}, err
}
return &MetricMapperLRUCache{cache: cache}, nil
return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil
}
func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
cacheGetsTotal.Inc()
m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok {
cacheHitsTotal.Inc()
m.metrics.CacheHitsTotal.Inc()
return result.(*MetricMapperCacheResult), true
} else {
return nil, false
@ -104,16 +121,15 @@ func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricTyp
}
func (m *MetricMapperLRUCache) trackCacheLength() {
cacheLength.Set(float64(m.cache.Len()))
m.metrics.CacheLength.Set(float64(m.cache.Len()))
}
func formatKey(metricString string, metricType MetricType) string {
return string(metricType) + "." + metricString
}
func NewMetricMapperNoopCache() *MetricMapperNoopCache {
cacheLength.Set(0)
return &MetricMapperNoopCache{}
func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache {
return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)}
}
func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
@ -130,16 +146,18 @@ func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricTy
type MetricMapperRRCache struct {
MetricMapperCache
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
metrics *CacheMetrics
}
func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) {
cacheLength.Set(0)
func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) {
metrics := NewCacheMetrics(reg)
c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
metrics: metrics,
}
return c, nil
}
@ -188,11 +206,5 @@ func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
cacheLength.Set(float64(length))
}
func init() {
prometheus.MustRegister(cacheLength)
prometheus.MustRegister(cacheGetsTotal)
prometheus.MustRegister(cacheHitsTotal)
m.metrics.CacheLength.Set(float64(length))
}

View file

@ -40,8 +40,9 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
}
type Registry struct {
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
Registerer prometheus.Registerer
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct
// so that we don't have to allocate them every time have to compute a label
// hash.
@ -49,11 +50,12 @@ type Registry struct {
Hasher hash.Hash64
}
func NewRegistry(mapper *mapper.MetricMapper) *Registry {
func NewRegistry(reg prometheus.Registerer, mapper *mapper.MetricMapper) *Registry {
return &Registry{
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
Registerer: reg,
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
}
}
@ -170,7 +172,7 @@ func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err
}
} else {
@ -206,7 +208,7 @@ func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help st
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err
}
} else {