mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-11-14 19:31:11 +00:00
Merge pull request #223 from claytono/rework-registry
Rework metric registration and tracking
This commit is contained in:
commit
50d5932124
4 changed files with 600 additions and 396 deletions
385
exporter.go
385
exporter.go
|
@ -15,12 +15,9 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sort"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -28,7 +25,6 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
|
|
||||||
"github.com/prometheus/statsd_exporter/pkg/clock"
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
|
@ -39,12 +35,6 @@ const (
|
||||||
regErrF = "Failed to update metric %q. Error: %s"
|
regErrF = "Failed to update metric %q. Error: %s"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
hash = fnv.New64a()
|
|
||||||
strBuf bytes.Buffer // Used for hashing.
|
|
||||||
intBuf = make([]byte, 8)
|
|
||||||
)
|
|
||||||
|
|
||||||
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
|
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
|
||||||
// This allows incoming metrics to have inconsistent label sets
|
// This allows incoming metrics to have inconsistent label sets
|
||||||
type uncheckedCollector struct {
|
type uncheckedCollector struct {
|
||||||
|
@ -56,250 +46,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
|
||||||
u.c.Collect(c)
|
u.c.Collect(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
CounterMetricType metricType = iota
|
|
||||||
GaugeMetricType
|
|
||||||
SummaryMetricType
|
|
||||||
HistogramMetricType
|
|
||||||
)
|
|
||||||
|
|
||||||
type metricChecker interface {
|
|
||||||
metricConflicts(string, metricType) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func getSortedLabelNames(labels prometheus.Labels) []string {
|
|
||||||
names := make([]string, 0, len(labels))
|
|
||||||
for labelName := range labels {
|
|
||||||
names = append(names, labelName)
|
|
||||||
}
|
|
||||||
sort.Strings(names)
|
|
||||||
return names
|
|
||||||
}
|
|
||||||
|
|
||||||
func getContainerMapKey(metricName string, labelNames []string) string {
|
|
||||||
return metricName + "," + strings.Join(labelNames, ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
// hashNameAndLabels returns a hash value of the provided name string and all
|
|
||||||
// the label names and values in the provided labels map.
|
|
||||||
//
|
|
||||||
// Not safe for concurrent use! (Uses a shared buffer and hasher to save on
|
|
||||||
// allocations.)
|
|
||||||
func hashNameAndLabels(name string, labelNames []string, labels prometheus.Labels) uint64 {
|
|
||||||
hash.Reset()
|
|
||||||
strBuf.Reset()
|
|
||||||
strBuf.WriteString(name)
|
|
||||||
strBuf.WriteByte(model.SeparatorByte)
|
|
||||||
|
|
||||||
for _, labelName := range labelNames {
|
|
||||||
strBuf.WriteString(labelName)
|
|
||||||
strBuf.WriteByte(model.SeparatorByte)
|
|
||||||
strBuf.WriteString(labels[labelName])
|
|
||||||
strBuf.WriteByte(model.SeparatorByte)
|
|
||||||
}
|
|
||||||
|
|
||||||
hash.Write(strBuf.Bytes())
|
|
||||||
return hash.Sum64()
|
|
||||||
}
|
|
||||||
|
|
||||||
type CounterContainer struct {
|
|
||||||
// metric name
|
|
||||||
Elements map[string]*prometheus.CounterVec
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCounterContainer() *CounterContainer {
|
|
||||||
return &CounterContainer{
|
|
||||||
Elements: make(map[string]*prometheus.CounterVec),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CounterContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string) (prometheus.Counter, error) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
|
|
||||||
counterVec, ok := c.Elements[mapKey]
|
|
||||||
if !ok {
|
|
||||||
if mc.metricConflicts(metricName, CounterMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
metricsCount.WithLabelValues("counter").Inc()
|
|
||||||
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
||||||
Name: metricName,
|
|
||||||
Help: help,
|
|
||||||
}, labelNames)
|
|
||||||
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.Elements[mapKey] = counterVec
|
|
||||||
}
|
|
||||||
return counterVec.GetMetricWith(labels)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CounterContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
if _, ok := c.Elements[mapKey]; ok {
|
|
||||||
c.Elements[mapKey].Delete(labels)
|
|
||||||
metricsCount.WithLabelValues("counter").Dec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type GaugeContainer struct {
|
|
||||||
Elements map[string]*prometheus.GaugeVec
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewGaugeContainer() *GaugeContainer {
|
|
||||||
return &GaugeContainer{
|
|
||||||
Elements: make(map[string]*prometheus.GaugeVec),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *GaugeContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string) (prometheus.Gauge, error) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
|
|
||||||
gaugeVec, ok := c.Elements[mapKey]
|
|
||||||
if !ok {
|
|
||||||
if mc.metricConflicts(metricName, GaugeMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
metricsCount.WithLabelValues("gauge").Inc()
|
|
||||||
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
||||||
Name: metricName,
|
|
||||||
Help: help,
|
|
||||||
}, labelNames)
|
|
||||||
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.Elements[mapKey] = gaugeVec
|
|
||||||
}
|
|
||||||
return gaugeVec.GetMetricWith(labels)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *GaugeContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
if _, ok := c.Elements[mapKey]; ok {
|
|
||||||
c.Elements[mapKey].Delete(labels)
|
|
||||||
metricsCount.WithLabelValues("gauge").Dec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type SummaryContainer struct {
|
|
||||||
Elements map[string]*prometheus.SummaryVec
|
|
||||||
mapper *mapper.MetricMapper
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer {
|
|
||||||
return &SummaryContainer{
|
|
||||||
Elements: make(map[string]*prometheus.SummaryVec),
|
|
||||||
mapper: mapper,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SummaryContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
|
|
||||||
summaryVec, ok := c.Elements[mapKey]
|
|
||||||
if !ok {
|
|
||||||
if mc.metricConflicts(metricName, SummaryMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
if mc.metricConflicts(metricName+"_sum", SummaryMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
if mc.metricConflicts(metricName+"_count", SummaryMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
metricsCount.WithLabelValues("summary").Inc()
|
|
||||||
quantiles := c.mapper.Defaults.Quantiles
|
|
||||||
if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 {
|
|
||||||
quantiles = mapping.Quantiles
|
|
||||||
}
|
|
||||||
objectives := make(map[float64]float64)
|
|
||||||
for _, q := range quantiles {
|
|
||||||
objectives[q.Quantile] = q.Error
|
|
||||||
}
|
|
||||||
// In the case of no mapping file, explicitly define the default quantiles
|
|
||||||
if len(objectives) == 0 {
|
|
||||||
objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
|
|
||||||
}
|
|
||||||
summaryVec = prometheus.NewSummaryVec(
|
|
||||||
prometheus.SummaryOpts{
|
|
||||||
Name: metricName,
|
|
||||||
Help: help,
|
|
||||||
Objectives: objectives,
|
|
||||||
}, labelNames)
|
|
||||||
if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.Elements[mapKey] = summaryVec
|
|
||||||
}
|
|
||||||
return summaryVec.GetMetricWith(labels)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SummaryContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
if _, ok := c.Elements[mapKey]; ok {
|
|
||||||
c.Elements[mapKey].Delete(labels)
|
|
||||||
metricsCount.WithLabelValues("summary").Dec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type HistogramContainer struct {
|
|
||||||
Elements map[string]*prometheus.HistogramVec
|
|
||||||
mapper *mapper.MetricMapper
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer {
|
|
||||||
return &HistogramContainer{
|
|
||||||
Elements: make(map[string]*prometheus.HistogramVec),
|
|
||||||
mapper: mapper,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *HistogramContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
|
|
||||||
histogramVec, ok := c.Elements[mapKey]
|
|
||||||
if !ok {
|
|
||||||
if mc.metricConflicts(metricName, HistogramMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
if mc.metricConflicts(metricName+"_sum", HistogramMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
if mc.metricConflicts(metricName+"_count", HistogramMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
if mc.metricConflicts(metricName+"_bucket", HistogramMetricType) {
|
|
||||||
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
|
||||||
}
|
|
||||||
metricsCount.WithLabelValues("histogram").Inc()
|
|
||||||
buckets := c.mapper.Defaults.Buckets
|
|
||||||
if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 {
|
|
||||||
buckets = mapping.Buckets
|
|
||||||
}
|
|
||||||
histogramVec = prometheus.NewHistogramVec(
|
|
||||||
prometheus.HistogramOpts{
|
|
||||||
Name: metricName,
|
|
||||||
Help: help,
|
|
||||||
Buckets: buckets,
|
|
||||||
}, labelNames)
|
|
||||||
if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.Elements[mapKey] = histogramVec
|
|
||||||
}
|
|
||||||
return histogramVec.GetMetricWith(labels)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *HistogramContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) {
|
|
||||||
mapKey := getContainerMapKey(metricName, labelNames)
|
|
||||||
if _, ok := c.Elements[mapKey]; ok {
|
|
||||||
c.Elements[mapKey].Delete(labels)
|
|
||||||
metricsCount.WithLabelValues("histogram").Dec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Event interface {
|
type Event interface {
|
||||||
MetricName() string
|
MetricName() string
|
||||||
Value() float64
|
Value() float64
|
||||||
|
@ -343,20 +89,9 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi
|
||||||
|
|
||||||
type Events []Event
|
type Events []Event
|
||||||
|
|
||||||
type LabelValues struct {
|
|
||||||
lastRegisteredAt time.Time
|
|
||||||
labels prometheus.Labels
|
|
||||||
ttl time.Duration
|
|
||||||
metricType metricType
|
|
||||||
}
|
|
||||||
|
|
||||||
type Exporter struct {
|
type Exporter struct {
|
||||||
Counters *CounterContainer
|
mapper *mapper.MetricMapper
|
||||||
Gauges *GaugeContainer
|
registry *registry
|
||||||
Summaries *SummaryContainer
|
|
||||||
Histograms *HistogramContainer
|
|
||||||
mapper *mapper.MetricMapper
|
|
||||||
labelValues map[string]map[uint64]*LabelValues
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replace invalid characters in the metric name with "_"
|
// Replace invalid characters in the metric name with "_"
|
||||||
|
@ -422,7 +157,7 @@ func (b *Exporter) Listen(e <-chan Events) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-removeStaleMetricsTicker.C:
|
case <-removeStaleMetricsTicker.C:
|
||||||
b.removeStaleMetrics()
|
b.registry.removeStaleMetrics()
|
||||||
case events, ok := <-e:
|
case events, ok := <-e:
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
||||||
|
@ -474,7 +209,6 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
metricName = escapeMetricName(event.MetricName())
|
metricName = escapeMetricName(event.MetricName())
|
||||||
}
|
}
|
||||||
|
|
||||||
sortedLabelNames := getSortedLabelNames(prometheusLabels)
|
|
||||||
switch ev := event.(type) {
|
switch ev := event.(type) {
|
||||||
case *CounterEvent:
|
case *CounterEvent:
|
||||||
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
||||||
|
@ -485,16 +219,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
counter, err := b.Counters.Get(
|
counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
counter.Add(event.Value())
|
counter.Add(event.Value())
|
||||||
b.saveLabelValues(metricName, CounterMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("counter").Inc()
|
eventStats.WithLabelValues("counter").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -502,13 +229,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case *GaugeEvent:
|
case *GaugeEvent:
|
||||||
gauge, err := b.Gauges.Get(
|
gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
)
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if ev.relative {
|
if ev.relative {
|
||||||
|
@ -516,7 +237,6 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
} else {
|
} else {
|
||||||
gauge.Set(event.Value())
|
gauge.Set(event.Value())
|
||||||
}
|
}
|
||||||
b.saveLabelValues(metricName, GaugeMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("gauge").Inc()
|
eventStats.WithLabelValues("gauge").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -534,17 +254,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
|
|
||||||
switch t {
|
switch t {
|
||||||
case mapper.TimerTypeHistogram:
|
case mapper.TimerTypeHistogram:
|
||||||
histogram, err := b.Histograms.Get(
|
histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
mapping,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
||||||
b.saveLabelValues(metricName, HistogramMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -552,17 +264,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
|
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
|
||||||
summary, err := b.Summaries.Get(
|
summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
mapping,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
||||||
b.saveLabelValues(metricName, SummaryMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -579,81 +283,10 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeStaleMetrics removes label values set from metric with stale values
|
|
||||||
func (b *Exporter) removeStaleMetrics() {
|
|
||||||
now := clock.Now()
|
|
||||||
// delete timeseries with expired ttl
|
|
||||||
for metricName := range b.labelValues {
|
|
||||||
for hash, lvs := range b.labelValues[metricName] {
|
|
||||||
if lvs.ttl == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) {
|
|
||||||
sortedLabelNames := getSortedLabelNames(lvs.labels)
|
|
||||||
b.Counters.Delete(metricName, sortedLabelNames, lvs.labels)
|
|
||||||
b.Gauges.Delete(metricName, sortedLabelNames, lvs.labels)
|
|
||||||
b.Summaries.Delete(metricName, sortedLabelNames, lvs.labels)
|
|
||||||
b.Histograms.Delete(metricName, sortedLabelNames, lvs.labels)
|
|
||||||
delete(b.labelValues[metricName], hash)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value
|
|
||||||
func (b *Exporter) saveLabelValues(metricName string, metricType metricType, labelNames []string, labels prometheus.Labels, ttl time.Duration) {
|
|
||||||
metric, hasMetric := b.labelValues[metricName]
|
|
||||||
if !hasMetric {
|
|
||||||
metric = make(map[uint64]*LabelValues)
|
|
||||||
b.labelValues[metricName] = metric
|
|
||||||
}
|
|
||||||
hash := hashNameAndLabels(metricName, labelNames, labels)
|
|
||||||
metricLabelValues, ok := metric[hash]
|
|
||||||
if !ok {
|
|
||||||
metricLabelValues = &LabelValues{
|
|
||||||
labels: labels,
|
|
||||||
ttl: ttl,
|
|
||||||
metricType: metricType,
|
|
||||||
}
|
|
||||||
b.labelValues[metricName][hash] = metricLabelValues
|
|
||||||
}
|
|
||||||
now := clock.Now()
|
|
||||||
metricLabelValues.lastRegisteredAt = now
|
|
||||||
// Update ttl from mapping
|
|
||||||
metricLabelValues.ttl = ttl
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Exporter) metricConflicts(metricName string, metricType metricType) bool {
|
|
||||||
metric, hasMetric := b.labelValues[metricName]
|
|
||||||
if !hasMetric {
|
|
||||||
// No metric with this name exists
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// The metric does exist. All metrics in the hash should be of the same
|
|
||||||
// type, so we pick check the first one we find in the hash to check the
|
|
||||||
// type.
|
|
||||||
for _, lvs := range metric {
|
|
||||||
if lvs.metricType == metricType {
|
|
||||||
// We've found a copy of this metric with this type, but different
|
|
||||||
// labels, so it's safe to create a new one.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The metric exists, but it's of a different type than we're trying to
|
|
||||||
// create.
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
||||||
return &Exporter{
|
return &Exporter{
|
||||||
Counters: NewCounterContainer(),
|
mapper: mapper,
|
||||||
Gauges: NewGaugeContainer(),
|
registry: newRegistry(mapper),
|
||||||
Summaries: NewSummaryContainer(mapper),
|
|
||||||
Histograms: NewHistogramContainer(mapper),
|
|
||||||
mapper: mapper,
|
|
||||||
labelValues: make(map[string]map[uint64]*LabelValues),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,11 @@ package main
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
func benchmarkExporter(times int, b *testing.B) {
|
func benchmarkUDPListener(times int, b *testing.B) {
|
||||||
input := []string{
|
input := []string{
|
||||||
"foo1:2|c",
|
"foo1:2|c",
|
||||||
"foo2:3|g",
|
"foo2:3|g",
|
||||||
|
@ -50,12 +52,102 @@ func benchmarkExporter(times int, b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkExporter1(b *testing.B) {
|
func BenchmarkUDPListener1(b *testing.B) {
|
||||||
benchmarkExporter(1, b)
|
benchmarkUDPListener(1, b)
|
||||||
}
|
}
|
||||||
func BenchmarkExporter5(b *testing.B) {
|
func BenchmarkUDPListener5(b *testing.B) {
|
||||||
benchmarkExporter(5, b)
|
benchmarkUDPListener(5, b)
|
||||||
}
|
}
|
||||||
func BenchmarkExporter50(b *testing.B) {
|
func BenchmarkUDPListener50(b *testing.B) {
|
||||||
benchmarkExporter(50, b)
|
benchmarkUDPListener(50, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkExporterListener(b *testing.B) {
|
||||||
|
events := Events{
|
||||||
|
&CounterEvent{ // simple counter
|
||||||
|
metricName: "counter",
|
||||||
|
value: 2,
|
||||||
|
},
|
||||||
|
&GaugeEvent{ // simple gauge
|
||||||
|
metricName: "gauge",
|
||||||
|
value: 10,
|
||||||
|
},
|
||||||
|
&TimerEvent{ // simple timer
|
||||||
|
metricName: "timer",
|
||||||
|
value: 200,
|
||||||
|
},
|
||||||
|
&TimerEvent{ // simple histogram
|
||||||
|
metricName: "histogram.test",
|
||||||
|
value: 200,
|
||||||
|
},
|
||||||
|
&CounterEvent{ // simple_tags
|
||||||
|
metricName: "simple_tags",
|
||||||
|
value: 100,
|
||||||
|
labels: map[string]string{
|
||||||
|
"alpha": "bar",
|
||||||
|
"bravo": "baz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&CounterEvent{ // slightly different tags
|
||||||
|
metricName: "simple_tags",
|
||||||
|
value: 100,
|
||||||
|
labels: map[string]string{
|
||||||
|
"alpha": "bar",
|
||||||
|
"charlie": "baz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&CounterEvent{ // and even more different tags
|
||||||
|
metricName: "simple_tags",
|
||||||
|
value: 100,
|
||||||
|
labels: map[string]string{
|
||||||
|
"alpha": "bar",
|
||||||
|
"bravo": "baz",
|
||||||
|
"golf": "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&CounterEvent{ // datadog tag extension with complex tags
|
||||||
|
metricName: "foo",
|
||||||
|
value: 100,
|
||||||
|
labels: map[string]string{
|
||||||
|
"action": "test",
|
||||||
|
"application": "testapp",
|
||||||
|
"application_component": "testcomp",
|
||||||
|
"application_role": "test_role",
|
||||||
|
"category": "category",
|
||||||
|
"controller": "controller",
|
||||||
|
"deployed_to": "production",
|
||||||
|
"kube_deployment": "deploy",
|
||||||
|
"kube_namespace": "kube-production",
|
||||||
|
"method": "get",
|
||||||
|
"version": "5.2.8374",
|
||||||
|
"status": "200",
|
||||||
|
"status_range": "2xx",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config := `
|
||||||
|
mappings:
|
||||||
|
- match: histogram.test
|
||||||
|
timer_type: histogram
|
||||||
|
name: "histogram_test"
|
||||||
|
`
|
||||||
|
|
||||||
|
testMapper := &mapper.MetricMapper{}
|
||||||
|
err := testMapper.InitFromYAMLString(config, 0)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("Config load error: %s %s", config, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ex := NewExporter(testMapper)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ec := make(chan Events, 1000)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
ec <- events
|
||||||
|
}
|
||||||
|
close(ec)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ex.Listen(ec)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -531,7 +531,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
|
||||||
|
|
||||||
metrics, err := prometheus.DefaultGatherer.Gather()
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Gather should not fail")
|
t.Fatal("Gather should not fail: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var metricFamily *dto.MetricFamily
|
var metricFamily *dto.MetricFamily
|
||||||
|
@ -591,6 +591,52 @@ func TestHistogramUnits(t *testing.T) {
|
||||||
t.Fatalf("Received unexpected value for histogram observation %f != .300", *value)
|
t.Fatalf("Received unexpected value for histogram observation %f != .300", *value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func TestCounterIncrement(t *testing.T) {
|
||||||
|
// Start exporter with a synchronous channel
|
||||||
|
events := make(chan Events)
|
||||||
|
go func() {
|
||||||
|
testMapper := mapper.MetricMapper{}
|
||||||
|
testMapper.InitCache(0)
|
||||||
|
ex := NewExporter(&testMapper)
|
||||||
|
ex.Listen(events)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Synchronously send a statsd event to wait for handleEvent execution.
|
||||||
|
// Then close events channel to stop a listener.
|
||||||
|
name := "foo_counter"
|
||||||
|
labels := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
c := Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: name,
|
||||||
|
value: 1,
|
||||||
|
labels: labels,
|
||||||
|
},
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: name,
|
||||||
|
value: 1,
|
||||||
|
labels: labels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events <- c
|
||||||
|
// Push empty event so that we block until the first event is consumed.
|
||||||
|
events <- Events{}
|
||||||
|
close(events)
|
||||||
|
|
||||||
|
// Check histogram value
|
||||||
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
||||||
|
}
|
||||||
|
value := getFloat64(metrics, name, labels)
|
||||||
|
if value == nil {
|
||||||
|
t.Fatal("Counter value should not be nil")
|
||||||
|
}
|
||||||
|
if *value != 2 {
|
||||||
|
t.Fatalf("Counter wasn't incremented properly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type statsDPacketHandler interface {
|
type statsDPacketHandler interface {
|
||||||
handlePacket(packet []byte, e chan<- Events)
|
handlePacket(packet []byte, e chan<- Events)
|
||||||
|
@ -764,6 +810,37 @@ mappings:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHashLabelNames(t *testing.T) {
|
||||||
|
r := newRegistry(nil)
|
||||||
|
// Validate value hash changes and name has doesn't when just the value changes.
|
||||||
|
hash1, _ := r.hashLabels(map[string]string{
|
||||||
|
"label": "value1",
|
||||||
|
})
|
||||||
|
hash2, _ := r.hashLabels(map[string]string{
|
||||||
|
"label": "value2",
|
||||||
|
})
|
||||||
|
if hash1.names != hash2.names {
|
||||||
|
t.Fatal("Hash of label names should match, but doesn't")
|
||||||
|
}
|
||||||
|
if hash1.values == hash2.values {
|
||||||
|
t.Fatal("Hash of label names shouldn't match, but do")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate value and name hashes change when the name changes.
|
||||||
|
hash1, _ = r.hashLabels(map[string]string{
|
||||||
|
"label1": "value",
|
||||||
|
})
|
||||||
|
hash2, _ = r.hashLabels(map[string]string{
|
||||||
|
"label2": "value",
|
||||||
|
})
|
||||||
|
if hash1.names == hash2.names {
|
||||||
|
t.Fatal("Hash of label names shouldn't match, but do")
|
||||||
|
}
|
||||||
|
if hash1.values == hash2.values {
|
||||||
|
t.Fatal("Hash of label names shouldn't match, but do")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
|
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
|
||||||
// Method returns a value or nil if metric is not found.
|
// Method returns a value or nil if metric is not found.
|
||||||
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
|
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
|
||||||
|
@ -779,13 +856,11 @@ func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labe
|
||||||
}
|
}
|
||||||
|
|
||||||
var metric *dto.Metric
|
var metric *dto.Metric
|
||||||
sortedLabelNames := getSortedLabelNames(labels)
|
labelStr := fmt.Sprintf("%v", labels)
|
||||||
labelsHash := hashNameAndLabels(name, sortedLabelNames, labels)
|
|
||||||
for _, m := range metricFamily.Metric {
|
for _, m := range metricFamily.Metric {
|
||||||
l := labelPairsAsLabels(m.GetLabel())
|
l := labelPairsAsLabels(m.GetLabel())
|
||||||
sln := getSortedLabelNames(l)
|
ls := fmt.Sprintf("%v", l)
|
||||||
h := hashNameAndLabels(name, sln, l)
|
if labelStr == ls {
|
||||||
if h == labelsHash {
|
|
||||||
metric = m
|
metric = m
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -889,17 +964,14 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no labels",
|
name: "no labels",
|
||||||
metric: "counter",
|
|
||||||
labels: map[string]string{},
|
labels: map[string]string{},
|
||||||
}, {
|
}, {
|
||||||
name: "one label",
|
name: "one label",
|
||||||
metric: "counter",
|
|
||||||
labels: map[string]string{
|
labels: map[string]string{
|
||||||
"label": "value",
|
"label": "value",
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
name: "many labels",
|
name: "many labels",
|
||||||
metric: "counter",
|
|
||||||
labels: map[string]string{
|
labels: map[string]string{
|
||||||
"label0": "value",
|
"label0": "value",
|
||||||
"label1": "value",
|
"label1": "value",
|
||||||
|
@ -915,11 +987,11 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r := newRegistry(nil)
|
||||||
for _, s := range scenarios {
|
for _, s := range scenarios {
|
||||||
sortedLabelNames := getSortedLabelNames(s.labels)
|
|
||||||
b.Run(s.name, func(b *testing.B) {
|
b.Run(s.name, func(b *testing.B) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
hashNameAndLabels(s.metric, sortedLabelNames, s.labels)
|
r.hashLabels(s.labels)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
407
registry.go
Normal file
407
registry.go
Normal file
|
@ -0,0 +1,407 @@
|
||||||
|
// Copyright 2013 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"hash/fnv"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
|
)
|
||||||
|
|
||||||
|
type metricType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CounterMetricType metricType = iota
|
||||||
|
GaugeMetricType
|
||||||
|
SummaryMetricType
|
||||||
|
HistogramMetricType
|
||||||
|
)
|
||||||
|
|
||||||
|
type nameHash uint64
|
||||||
|
type valueHash uint64
|
||||||
|
type labelHash struct {
|
||||||
|
// This is a hash over the label names
|
||||||
|
names nameHash
|
||||||
|
// This is a hash over the label names + label values
|
||||||
|
values valueHash
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricHolder interface{}
|
||||||
|
|
||||||
|
type registeredMetric struct {
|
||||||
|
lastRegisteredAt time.Time
|
||||||
|
labels prometheus.Labels
|
||||||
|
ttl time.Duration
|
||||||
|
metric metricHolder
|
||||||
|
vecKey nameHash
|
||||||
|
}
|
||||||
|
|
||||||
|
type vectorHolder interface {
|
||||||
|
Delete(label prometheus.Labels) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type vector struct {
|
||||||
|
holder vectorHolder
|
||||||
|
refCount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type metric struct {
|
||||||
|
metricType metricType
|
||||||
|
// Vectors key is the hash of the label names
|
||||||
|
vectors map[nameHash]*vector
|
||||||
|
// Metrics key is a hash of the label names + label values
|
||||||
|
metrics map[valueHash]*registeredMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
type registry struct {
|
||||||
|
metrics map[string]metric
|
||||||
|
mapper *mapper.MetricMapper
|
||||||
|
// The below value and label variables are allocated in the registry struct
|
||||||
|
// so that we don't have to allocate them every time have to compute a label
|
||||||
|
// hash.
|
||||||
|
valueBuf, nameBuf bytes.Buffer
|
||||||
|
hasher hash.Hash64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRegistry(mapper *mapper.MetricMapper) *registry {
|
||||||
|
return ®istry{
|
||||||
|
metrics: make(map[string]metric),
|
||||||
|
mapper: mapper,
|
||||||
|
hasher: fnv.New64a(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) metricConflicts(metricName string, metricType metricType) bool {
|
||||||
|
vector, hasMetric := r.metrics[metricName]
|
||||||
|
if !hasMetric {
|
||||||
|
// No metric with this name exists
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if vector.metricType == metricType {
|
||||||
|
// We've found a copy of this metric with this type, but different
|
||||||
|
// labels, so it's safe to create a new one.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// The metric exists, but it's of a different type than we're trying to
|
||||||
|
// create.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) {
|
||||||
|
metric, hasMetric := r.metrics[metricName]
|
||||||
|
if !hasMetric {
|
||||||
|
metric.metricType = metricType
|
||||||
|
metric.vectors = make(map[nameHash]*vector)
|
||||||
|
metric.metrics = make(map[valueHash]*registeredMetric)
|
||||||
|
|
||||||
|
r.metrics[metricName] = metric
|
||||||
|
}
|
||||||
|
|
||||||
|
v, ok := metric.vectors[hash.names]
|
||||||
|
if !ok {
|
||||||
|
v = &vector{holder: vh}
|
||||||
|
metric.vectors[hash.names] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
rm, ok := metric.metrics[hash.values]
|
||||||
|
if !ok {
|
||||||
|
rm = ®isteredMetric{
|
||||||
|
labels: labels,
|
||||||
|
ttl: ttl,
|
||||||
|
metric: mh,
|
||||||
|
vecKey: hash.names,
|
||||||
|
}
|
||||||
|
metric.metrics[hash.values] = rm
|
||||||
|
v.refCount++
|
||||||
|
}
|
||||||
|
now := clock.Now()
|
||||||
|
rm.lastRegisteredAt = now
|
||||||
|
// Update ttl from mapping
|
||||||
|
rm.ttl = ttl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) {
|
||||||
|
metric, hasMetric := r.metrics[metricName]
|
||||||
|
|
||||||
|
if !hasMetric {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if metric.metricType != metricType {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rm, ok := metric.metrics[hash.values]
|
||||||
|
if ok {
|
||||||
|
return metric.vectors[hash.names].holder, rm.metric
|
||||||
|
}
|
||||||
|
|
||||||
|
vector, ok := metric.vectors[hash.names]
|
||||||
|
if ok {
|
||||||
|
return vector.holder, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, CounterMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Counter), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, CounterMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var counterVec *prometheus.CounterVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("counter").Inc()
|
||||||
|
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
counterVec = vh.(*prometheus.CounterVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var counter prometheus.Counter
|
||||||
|
var err error
|
||||||
|
if counter, err = counterVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl)
|
||||||
|
|
||||||
|
return counter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, GaugeMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Gauge), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, GaugeMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gaugeVec *prometheus.GaugeVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("gauge").Inc()
|
||||||
|
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
gaugeVec = vh.(*prometheus.GaugeVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gauge prometheus.Gauge
|
||||||
|
var err error
|
||||||
|
if gauge, err = gaugeVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl)
|
||||||
|
|
||||||
|
return gauge, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, HistogramMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Observer), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_sum", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_count", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_bucket", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var histogramVec *prometheus.HistogramVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("histogram").Inc()
|
||||||
|
buckets := r.mapper.Defaults.Buckets
|
||||||
|
if mapping.Buckets != nil && len(mapping.Buckets) > 0 {
|
||||||
|
buckets = mapping.Buckets
|
||||||
|
}
|
||||||
|
histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
Buckets: buckets,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
histogramVec = vh.(*prometheus.HistogramVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var observer prometheus.Observer
|
||||||
|
var err error
|
||||||
|
if observer, err = histogramVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl)
|
||||||
|
|
||||||
|
return observer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, SummaryMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Observer), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_sum", SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_count", SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var summaryVec *prometheus.SummaryVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("summary").Inc()
|
||||||
|
quantiles := r.mapper.Defaults.Quantiles
|
||||||
|
if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 {
|
||||||
|
quantiles = mapping.Quantiles
|
||||||
|
}
|
||||||
|
objectives := make(map[float64]float64)
|
||||||
|
for _, q := range quantiles {
|
||||||
|
objectives[q.Quantile] = q.Error
|
||||||
|
}
|
||||||
|
// In the case of no mapping file, explicitly define the default quantiles
|
||||||
|
if len(objectives) == 0 {
|
||||||
|
objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
|
||||||
|
}
|
||||||
|
summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
Objectives: objectives,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
summaryVec = vh.(*prometheus.SummaryVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var observer prometheus.Observer
|
||||||
|
var err error
|
||||||
|
if observer, err = summaryVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl)
|
||||||
|
|
||||||
|
return observer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) removeStaleMetrics() {
|
||||||
|
now := clock.Now()
|
||||||
|
// delete timeseries with expired ttl
|
||||||
|
for _, metric := range r.metrics {
|
||||||
|
for hash, rm := range metric.metrics {
|
||||||
|
if rm.ttl == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if rm.lastRegisteredAt.Add(rm.ttl).Before(now) {
|
||||||
|
metric.vectors[rm.vecKey].holder.Delete(rm.labels)
|
||||||
|
metric.vectors[rm.vecKey].refCount--
|
||||||
|
delete(metric.metrics, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculates a hash of both the label names and the label names and values.
|
||||||
|
func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) {
|
||||||
|
r.hasher.Reset()
|
||||||
|
r.nameBuf.Reset()
|
||||||
|
r.valueBuf.Reset()
|
||||||
|
labelNames := make([]string, 0, len(labels))
|
||||||
|
|
||||||
|
for labelName := range labels {
|
||||||
|
labelNames = append(labelNames, labelName)
|
||||||
|
}
|
||||||
|
sort.Strings(labelNames)
|
||||||
|
|
||||||
|
r.valueBuf.WriteByte(model.SeparatorByte)
|
||||||
|
for _, labelName := range labelNames {
|
||||||
|
r.valueBuf.WriteString(labels[labelName])
|
||||||
|
r.valueBuf.WriteByte(model.SeparatorByte)
|
||||||
|
|
||||||
|
r.nameBuf.WriteString(labelName)
|
||||||
|
r.nameBuf.WriteByte(model.SeparatorByte)
|
||||||
|
}
|
||||||
|
|
||||||
|
lh := labelHash{}
|
||||||
|
r.hasher.Write(r.nameBuf.Bytes())
|
||||||
|
lh.names = nameHash(r.hasher.Sum64())
|
||||||
|
|
||||||
|
// Now add the values to the names we've already hashed.
|
||||||
|
r.hasher.Write(r.valueBuf.Bytes())
|
||||||
|
lh.values = valueHash(r.hasher.Sum64())
|
||||||
|
|
||||||
|
return lh, labelNames
|
||||||
|
}
|
Loading…
Reference in a new issue