diff --git a/exporter.go b/exporter.go index e22f5e4..0485394 100644 --- a/exporter.go +++ b/exporter.go @@ -15,12 +15,9 @@ package main import ( "bufio" - "bytes" "fmt" - "hash/fnv" "io" "net" - "sort" "strconv" "strings" "time" @@ -28,7 +25,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" - "github.com/prometheus/common/model" "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" @@ -39,12 +35,6 @@ const ( regErrF = "Failed to update metric %q. Error: %s" ) -var ( - hash = fnv.New64a() - strBuf bytes.Buffer // Used for hashing. - intBuf = make([]byte, 8) -) - // uncheckedCollector wraps a Collector but its Describe method yields no Desc. // This allows incoming metrics to have inconsistent label sets type uncheckedCollector struct { @@ -56,250 +46,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { u.c.Collect(c) } -type metricType int - -const ( - CounterMetricType metricType = iota - GaugeMetricType - SummaryMetricType - HistogramMetricType -) - -type metricChecker interface { - metricConflicts(string, metricType) bool -} - -func getSortedLabelNames(labels prometheus.Labels) []string { - names := make([]string, 0, len(labels)) - for labelName := range labels { - names = append(names, labelName) - } - sort.Strings(names) - return names -} - -func getContainerMapKey(metricName string, labelNames []string) string { - return metricName + "," + strings.Join(labelNames, ",") -} - -// hashNameAndLabels returns a hash value of the provided name string and all -// the label names and values in the provided labels map. -// -// Not safe for concurrent use! (Uses a shared buffer and hasher to save on -// allocations.) -func hashNameAndLabels(name string, labelNames []string, labels prometheus.Labels) uint64 { - hash.Reset() - strBuf.Reset() - strBuf.WriteString(name) - strBuf.WriteByte(model.SeparatorByte) - - for _, labelName := range labelNames { - strBuf.WriteString(labelName) - strBuf.WriteByte(model.SeparatorByte) - strBuf.WriteString(labels[labelName]) - strBuf.WriteByte(model.SeparatorByte) - } - - hash.Write(strBuf.Bytes()) - return hash.Sum64() -} - -type CounterContainer struct { - // metric name - Elements map[string]*prometheus.CounterVec -} - -func NewCounterContainer() *CounterContainer { - return &CounterContainer{ - Elements: make(map[string]*prometheus.CounterVec), - } -} - -func (c *CounterContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string) (prometheus.Counter, error) { - mapKey := getContainerMapKey(metricName, labelNames) - - counterVec, ok := c.Elements[mapKey] - if !ok { - if mc.metricConflicts(metricName, CounterMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - metricsCount.WithLabelValues("counter").Inc() - counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: metricName, - Help: help, - }, labelNames) - if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { - return nil, err - } - c.Elements[mapKey] = counterVec - } - return counterVec.GetMetricWith(labels) -} - -func (c *CounterContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) { - mapKey := getContainerMapKey(metricName, labelNames) - if _, ok := c.Elements[mapKey]; ok { - c.Elements[mapKey].Delete(labels) - metricsCount.WithLabelValues("counter").Dec() - } -} - -type GaugeContainer struct { - Elements map[string]*prometheus.GaugeVec -} - -func NewGaugeContainer() *GaugeContainer { - return &GaugeContainer{ - Elements: make(map[string]*prometheus.GaugeVec), - } -} - -func (c *GaugeContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string) (prometheus.Gauge, error) { - mapKey := getContainerMapKey(metricName, labelNames) - - gaugeVec, ok := c.Elements[mapKey] - if !ok { - if mc.metricConflicts(metricName, GaugeMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - metricsCount.WithLabelValues("gauge").Inc() - gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: metricName, - Help: help, - }, labelNames) - if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { - return nil, err - } - c.Elements[mapKey] = gaugeVec - } - return gaugeVec.GetMetricWith(labels) -} - -func (c *GaugeContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) { - mapKey := getContainerMapKey(metricName, labelNames) - if _, ok := c.Elements[mapKey]; ok { - c.Elements[mapKey].Delete(labels) - metricsCount.WithLabelValues("gauge").Dec() - } -} - -type SummaryContainer struct { - Elements map[string]*prometheus.SummaryVec - mapper *mapper.MetricMapper -} - -func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer { - return &SummaryContainer{ - Elements: make(map[string]*prometheus.SummaryVec), - mapper: mapper, - } -} - -func (c *SummaryContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - mapKey := getContainerMapKey(metricName, labelNames) - - summaryVec, ok := c.Elements[mapKey] - if !ok { - if mc.metricConflicts(metricName, SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if mc.metricConflicts(metricName+"_sum", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if mc.metricConflicts(metricName+"_count", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - metricsCount.WithLabelValues("summary").Inc() - quantiles := c.mapper.Defaults.Quantiles - if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 { - quantiles = mapping.Quantiles - } - objectives := make(map[float64]float64) - for _, q := range quantiles { - objectives[q.Quantile] = q.Error - } - // In the case of no mapping file, explicitly define the default quantiles - if len(objectives) == 0 { - objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} - } - summaryVec = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: metricName, - Help: help, - Objectives: objectives, - }, labelNames) - if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { - return nil, err - } - c.Elements[mapKey] = summaryVec - } - return summaryVec.GetMetricWith(labels) -} - -func (c *SummaryContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) { - mapKey := getContainerMapKey(metricName, labelNames) - if _, ok := c.Elements[mapKey]; ok { - c.Elements[mapKey].Delete(labels) - metricsCount.WithLabelValues("summary").Dec() - } -} - -type HistogramContainer struct { - Elements map[string]*prometheus.HistogramVec - mapper *mapper.MetricMapper -} - -func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer { - return &HistogramContainer{ - Elements: make(map[string]*prometheus.HistogramVec), - mapper: mapper, - } -} - -func (c *HistogramContainer) Get(metricName string, labelNames []string, labels prometheus.Labels, mc metricChecker, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - mapKey := getContainerMapKey(metricName, labelNames) - - histogramVec, ok := c.Elements[mapKey] - if !ok { - if mc.metricConflicts(metricName, HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if mc.metricConflicts(metricName+"_sum", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if mc.metricConflicts(metricName+"_count", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if mc.metricConflicts(metricName+"_bucket", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - metricsCount.WithLabelValues("histogram").Inc() - buckets := c.mapper.Defaults.Buckets - if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 { - buckets = mapping.Buckets - } - histogramVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: metricName, - Help: help, - Buckets: buckets, - }, labelNames) - if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { - return nil, err - } - c.Elements[mapKey] = histogramVec - } - return histogramVec.GetMetricWith(labels) -} - -func (c *HistogramContainer) Delete(metricName string, labelNames []string, labels prometheus.Labels) { - mapKey := getContainerMapKey(metricName, labelNames) - if _, ok := c.Elements[mapKey]; ok { - c.Elements[mapKey].Delete(labels) - metricsCount.WithLabelValues("histogram").Dec() - } -} - type Event interface { MetricName() string Value() float64 @@ -343,20 +89,9 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi type Events []Event -type LabelValues struct { - lastRegisteredAt time.Time - labels prometheus.Labels - ttl time.Duration - metricType metricType -} - type Exporter struct { - Counters *CounterContainer - Gauges *GaugeContainer - Summaries *SummaryContainer - Histograms *HistogramContainer - mapper *mapper.MetricMapper - labelValues map[string]map[uint64]*LabelValues + mapper *mapper.MetricMapper + registry *registry } // Replace invalid characters in the metric name with "_" @@ -422,7 +157,7 @@ func (b *Exporter) Listen(e <-chan Events) { for { select { case <-removeStaleMetricsTicker.C: - b.removeStaleMetrics() + b.registry.removeStaleMetrics() case events, ok := <-e: if !ok { log.Debug("Channel is closed. Break out of Exporter.Listener.") @@ -474,7 +209,6 @@ func (b *Exporter) handleEvent(event Event) { metricName = escapeMetricName(event.MetricName()) } - sortedLabelNames := getSortedLabelNames(prometheusLabels) switch ev := event.(type) { case *CounterEvent: // We don't accept negative values for counters. Incrementing the counter with a negative number @@ -485,16 +219,9 @@ func (b *Exporter) handleEvent(event Event) { return } - counter, err := b.Counters.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - ) + counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping) if err == nil { counter.Add(event.Value()) - b.saveLabelValues(metricName, CounterMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("counter").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -502,13 +229,7 @@ func (b *Exporter) handleEvent(event Event) { } case *GaugeEvent: - gauge, err := b.Gauges.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - ) + gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping) if err == nil { if ev.relative { @@ -516,7 +237,6 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } - b.saveLabelValues(metricName, GaugeMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("gauge").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -534,17 +254,9 @@ func (b *Exporter) handleEvent(event Event) { switch t { case mapper.TimerTypeHistogram: - histogram, err := b.Histograms.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - mapping, - ) + histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping) if err == nil { histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - b.saveLabelValues(metricName, HistogramMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -552,17 +264,9 @@ func (b *Exporter) handleEvent(event Event) { } case mapper.TimerTypeDefault, mapper.TimerTypeSummary: - summary, err := b.Summaries.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - mapping, - ) + summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping) if err == nil { summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - b.saveLabelValues(metricName, SummaryMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -579,81 +283,10 @@ func (b *Exporter) handleEvent(event Event) { } } -// removeStaleMetrics removes label values set from metric with stale values -func (b *Exporter) removeStaleMetrics() { - now := clock.Now() - // delete timeseries with expired ttl - for metricName := range b.labelValues { - for hash, lvs := range b.labelValues[metricName] { - if lvs.ttl == 0 { - continue - } - if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) { - sortedLabelNames := getSortedLabelNames(lvs.labels) - b.Counters.Delete(metricName, sortedLabelNames, lvs.labels) - b.Gauges.Delete(metricName, sortedLabelNames, lvs.labels) - b.Summaries.Delete(metricName, sortedLabelNames, lvs.labels) - b.Histograms.Delete(metricName, sortedLabelNames, lvs.labels) - delete(b.labelValues[metricName], hash) - } - } - } -} - -// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value -func (b *Exporter) saveLabelValues(metricName string, metricType metricType, labelNames []string, labels prometheus.Labels, ttl time.Duration) { - metric, hasMetric := b.labelValues[metricName] - if !hasMetric { - metric = make(map[uint64]*LabelValues) - b.labelValues[metricName] = metric - } - hash := hashNameAndLabels(metricName, labelNames, labels) - metricLabelValues, ok := metric[hash] - if !ok { - metricLabelValues = &LabelValues{ - labels: labels, - ttl: ttl, - metricType: metricType, - } - b.labelValues[metricName][hash] = metricLabelValues - } - now := clock.Now() - metricLabelValues.lastRegisteredAt = now - // Update ttl from mapping - metricLabelValues.ttl = ttl -} - -func (b *Exporter) metricConflicts(metricName string, metricType metricType) bool { - metric, hasMetric := b.labelValues[metricName] - if !hasMetric { - // No metric with this name exists - return false - } - - // The metric does exist. All metrics in the hash should be of the same - // type, so we pick check the first one we find in the hash to check the - // type. - for _, lvs := range metric { - if lvs.metricType == metricType { - // We've found a copy of this metric with this type, but different - // labels, so it's safe to create a new one. - return false - } - } - - // The metric exists, but it's of a different type than we're trying to - // create. - return true -} - func NewExporter(mapper *mapper.MetricMapper) *Exporter { return &Exporter{ - Counters: NewCounterContainer(), - Gauges: NewGaugeContainer(), - Summaries: NewSummaryContainer(mapper), - Histograms: NewHistogramContainer(mapper), - mapper: mapper, - labelValues: make(map[string]map[uint64]*LabelValues), + mapper: mapper, + registry: newRegistry(mapper), } } diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 1c3104c..52cf173 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -16,9 +16,11 @@ package main import ( "fmt" "testing" + + "github.com/prometheus/statsd_exporter/pkg/mapper" ) -func benchmarkExporter(times int, b *testing.B) { +func benchmarkUDPListener(times int, b *testing.B) { input := []string{ "foo1:2|c", "foo2:3|g", @@ -50,12 +52,102 @@ func benchmarkExporter(times int, b *testing.B) { } } -func BenchmarkExporter1(b *testing.B) { - benchmarkExporter(1, b) +func BenchmarkUDPListener1(b *testing.B) { + benchmarkUDPListener(1, b) } -func BenchmarkExporter5(b *testing.B) { - benchmarkExporter(5, b) +func BenchmarkUDPListener5(b *testing.B) { + benchmarkUDPListener(5, b) } -func BenchmarkExporter50(b *testing.B) { - benchmarkExporter(50, b) +func BenchmarkUDPListener50(b *testing.B) { + benchmarkUDPListener(50, b) +} + +func BenchmarkExporterListener(b *testing.B) { + events := Events{ + &CounterEvent{ // simple counter + metricName: "counter", + value: 2, + }, + &GaugeEvent{ // simple gauge + metricName: "gauge", + value: 10, + }, + &TimerEvent{ // simple timer + metricName: "timer", + value: 200, + }, + &TimerEvent{ // simple histogram + metricName: "histogram.test", + value: 200, + }, + &CounterEvent{ // simple_tags + metricName: "simple_tags", + value: 100, + labels: map[string]string{ + "alpha": "bar", + "bravo": "baz", + }, + }, + &CounterEvent{ // slightly different tags + metricName: "simple_tags", + value: 100, + labels: map[string]string{ + "alpha": "bar", + "charlie": "baz", + }, + }, + &CounterEvent{ // and even more different tags + metricName: "simple_tags", + value: 100, + labels: map[string]string{ + "alpha": "bar", + "bravo": "baz", + "golf": "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong", + }, + }, + &CounterEvent{ // datadog tag extension with complex tags + metricName: "foo", + value: 100, + labels: map[string]string{ + "action": "test", + "application": "testapp", + "application_component": "testcomp", + "application_role": "test_role", + "category": "category", + "controller": "controller", + "deployed_to": "production", + "kube_deployment": "deploy", + "kube_namespace": "kube-production", + "method": "get", + "version": "5.2.8374", + "status": "200", + "status_range": "2xx", + }, + }, + } + config := ` +mappings: +- match: histogram.test + timer_type: histogram + name: "histogram_test" +` + + testMapper := &mapper.MetricMapper{} + err := testMapper.InitFromYAMLString(config, 0) + if err != nil { + b.Fatalf("Config load error: %s %s", config, err) + } + + ex := NewExporter(testMapper) + for i := 0; i < b.N; i++ { + ec := make(chan Events, 1000) + go func() { + for i := 0; i < 1000; i++ { + ec <- events + } + close(ec) + }() + + ex.Listen(ec) + } } diff --git a/exporter_test.go b/exporter_test.go index fc3c070..7479232 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -531,7 +531,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { - t.Fatal("Gather should not fail") + t.Fatal("Gather should not fail: ", err) } var metricFamily *dto.MetricFamily @@ -591,6 +591,52 @@ func TestHistogramUnits(t *testing.T) { t.Fatalf("Received unexpected value for histogram observation %f != .300", *value) } } +func TestCounterIncrement(t *testing.T) { + // Start exporter with a synchronous channel + events := make(chan Events) + go func() { + testMapper := mapper.MetricMapper{} + testMapper.InitCache(0) + ex := NewExporter(&testMapper) + ex.Listen(events) + }() + + // Synchronously send a statsd event to wait for handleEvent execution. + // Then close events channel to stop a listener. + name := "foo_counter" + labels := map[string]string{ + "foo": "bar", + } + c := Events{ + &CounterEvent{ + metricName: name, + value: 1, + labels: labels, + }, + &CounterEvent{ + metricName: name, + value: 1, + labels: labels, + }, + } + events <- c + // Push empty event so that we block until the first event is consumed. + events <- Events{} + close(events) + + // Check histogram value + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatalf("Cannot gather from DefaultGatherer: %v", err) + } + value := getFloat64(metrics, name, labels) + if value == nil { + t.Fatal("Counter value should not be nil") + } + if *value != 2 { + t.Fatalf("Counter wasn't incremented properly") + } +} type statsDPacketHandler interface { handlePacket(packet []byte, e chan<- Events) @@ -764,6 +810,37 @@ mappings: } } +func TestHashLabelNames(t *testing.T) { + r := newRegistry(nil) + // Validate value hash changes and name has doesn't when just the value changes. + hash1, _ := r.hashLabels(map[string]string{ + "label": "value1", + }) + hash2, _ := r.hashLabels(map[string]string{ + "label": "value2", + }) + if hash1.names != hash2.names { + t.Fatal("Hash of label names should match, but doesn't") + } + if hash1.values == hash2.values { + t.Fatal("Hash of label names shouldn't match, but do") + } + + // Validate value and name hashes change when the name changes. + hash1, _ = r.hashLabels(map[string]string{ + "label1": "value", + }) + hash2, _ = r.hashLabels(map[string]string{ + "label2": "value", + }) + if hash1.names == hash2.names { + t.Fatal("Hash of label names shouldn't match, but do") + } + if hash1.values == hash2.values { + t.Fatal("Hash of label names shouldn't match, but do") + } +} + // getFloat64 search for metric by name in array of MetricFamily and then search a value by labels. // Method returns a value or nil if metric is not found. func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { @@ -779,13 +856,11 @@ func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labe } var metric *dto.Metric - sortedLabelNames := getSortedLabelNames(labels) - labelsHash := hashNameAndLabels(name, sortedLabelNames, labels) + labelStr := fmt.Sprintf("%v", labels) for _, m := range metricFamily.Metric { l := labelPairsAsLabels(m.GetLabel()) - sln := getSortedLabelNames(l) - h := hashNameAndLabels(name, sln, l) - if h == labelsHash { + ls := fmt.Sprintf("%v", l) + if labelStr == ls { metric = m break } @@ -889,17 +964,14 @@ func BenchmarkHashNameAndLabels(b *testing.B) { }{ { name: "no labels", - metric: "counter", labels: map[string]string{}, }, { - name: "one label", - metric: "counter", + name: "one label", labels: map[string]string{ "label": "value", }, }, { - name: "many labels", - metric: "counter", + name: "many labels", labels: map[string]string{ "label0": "value", "label1": "value", @@ -915,11 +987,11 @@ func BenchmarkHashNameAndLabels(b *testing.B) { }, } + r := newRegistry(nil) for _, s := range scenarios { - sortedLabelNames := getSortedLabelNames(s.labels) b.Run(s.name, func(b *testing.B) { for n := 0; n < b.N; n++ { - hashNameAndLabels(s.metric, sortedLabelNames, s.labels) + r.hashLabels(s.labels) } }) } diff --git a/registry.go b/registry.go new file mode 100644 index 0000000..64a58b3 --- /dev/null +++ b/registry.go @@ -0,0 +1,407 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "fmt" + "hash" + "hash/fnv" + "sort" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" +) + +type metricType int + +const ( + CounterMetricType metricType = iota + GaugeMetricType + SummaryMetricType + HistogramMetricType +) + +type nameHash uint64 +type valueHash uint64 +type labelHash struct { + // This is a hash over the label names + names nameHash + // This is a hash over the label names + label values + values valueHash +} + +type metricHolder interface{} + +type registeredMetric struct { + lastRegisteredAt time.Time + labels prometheus.Labels + ttl time.Duration + metric metricHolder + vecKey nameHash +} + +type vectorHolder interface { + Delete(label prometheus.Labels) bool +} + +type vector struct { + holder vectorHolder + refCount uint64 +} + +type metric struct { + metricType metricType + // Vectors key is the hash of the label names + vectors map[nameHash]*vector + // Metrics key is a hash of the label names + label values + metrics map[valueHash]*registeredMetric +} + +type registry struct { + metrics map[string]metric + mapper *mapper.MetricMapper + // The below value and label variables are allocated in the registry struct + // so that we don't have to allocate them every time have to compute a label + // hash. + valueBuf, nameBuf bytes.Buffer + hasher hash.Hash64 +} + +func newRegistry(mapper *mapper.MetricMapper) *registry { + return ®istry{ + metrics: make(map[string]metric), + mapper: mapper, + hasher: fnv.New64a(), + } +} + +func (r *registry) metricConflicts(metricName string, metricType metricType) bool { + vector, hasMetric := r.metrics[metricName] + if !hasMetric { + // No metric with this name exists + return false + } + + if vector.metricType == metricType { + // We've found a copy of this metric with this type, but different + // labels, so it's safe to create a new one. + return false + } + + // The metric exists, but it's of a different type than we're trying to + // create. + return true +} + +func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) { + r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl) +} + +func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) { + r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl) +} + +func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) { + r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl) +} + +func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) { + r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl) +} + +func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) { + metric, hasMetric := r.metrics[metricName] + if !hasMetric { + metric.metricType = metricType + metric.vectors = make(map[nameHash]*vector) + metric.metrics = make(map[valueHash]*registeredMetric) + + r.metrics[metricName] = metric + } + + v, ok := metric.vectors[hash.names] + if !ok { + v = &vector{holder: vh} + metric.vectors[hash.names] = v + } + + rm, ok := metric.metrics[hash.values] + if !ok { + rm = ®isteredMetric{ + labels: labels, + ttl: ttl, + metric: mh, + vecKey: hash.names, + } + metric.metrics[hash.values] = rm + v.refCount++ + } + now := clock.Now() + rm.lastRegisteredAt = now + // Update ttl from mapping + rm.ttl = ttl +} + +func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) { + metric, hasMetric := r.metrics[metricName] + + if !hasMetric { + return nil, nil + } + if metric.metricType != metricType { + return nil, nil + } + + rm, ok := metric.metrics[hash.values] + if ok { + return metric.vectors[hash.names].holder, rm.metric + } + + vector, ok := metric.vectors[hash.names] + if ok { + return vector.holder, nil + } + + return nil, nil +} + +func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, CounterMetricType) + if mh != nil { + return mh.(prometheus.Counter), nil + } + + if r.metricConflicts(metricName, CounterMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var counterVec *prometheus.CounterVec + if vh == nil { + metricsCount.WithLabelValues("counter").Inc() + counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { + return nil, err + } + } else { + counterVec = vh.(*prometheus.CounterVec) + } + + var counter prometheus.Counter + var err error + if counter, err = counterVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl) + + return counter, nil +} + +func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, GaugeMetricType) + if mh != nil { + return mh.(prometheus.Gauge), nil + } + + if r.metricConflicts(metricName, GaugeMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var gaugeVec *prometheus.GaugeVec + if vh == nil { + metricsCount.WithLabelValues("gauge").Inc() + gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { + return nil, err + } + } else { + gaugeVec = vh.(*prometheus.GaugeVec) + } + + var gauge prometheus.Gauge + var err error + if gauge, err = gaugeVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl) + + return gauge, nil +} + +func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, HistogramMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.metricConflicts(metricName, HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_sum", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_count", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_bucket", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var histogramVec *prometheus.HistogramVec + if vh == nil { + metricsCount.WithLabelValues("histogram").Inc() + buckets := r.mapper.Defaults.Buckets + if mapping.Buckets != nil && len(mapping.Buckets) > 0 { + buckets = mapping.Buckets + } + histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: metricName, + Help: help, + Buckets: buckets, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { + return nil, err + } + } else { + histogramVec = vh.(*prometheus.HistogramVec) + } + + var observer prometheus.Observer + var err error + if observer, err = histogramVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, SummaryMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.metricConflicts(metricName, SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_sum", SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_count", SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var summaryVec *prometheus.SummaryVec + if vh == nil { + metricsCount.WithLabelValues("summary").Inc() + quantiles := r.mapper.Defaults.Quantiles + if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 { + quantiles = mapping.Quantiles + } + objectives := make(map[float64]float64) + for _, q := range quantiles { + objectives[q.Quantile] = q.Error + } + // In the case of no mapping file, explicitly define the default quantiles + if len(objectives) == 0 { + objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} + } + summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: metricName, + Help: help, + Objectives: objectives, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { + return nil, err + } + } else { + summaryVec = vh.(*prometheus.SummaryVec) + } + + var observer prometheus.Observer + var err error + if observer, err = summaryVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *registry) removeStaleMetrics() { + now := clock.Now() + // delete timeseries with expired ttl + for _, metric := range r.metrics { + for hash, rm := range metric.metrics { + if rm.ttl == 0 { + continue + } + if rm.lastRegisteredAt.Add(rm.ttl).Before(now) { + metric.vectors[rm.vecKey].holder.Delete(rm.labels) + metric.vectors[rm.vecKey].refCount-- + delete(metric.metrics, hash) + } + } + } +} + +// Calculates a hash of both the label names and the label names and values. +func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) { + r.hasher.Reset() + r.nameBuf.Reset() + r.valueBuf.Reset() + labelNames := make([]string, 0, len(labels)) + + for labelName := range labels { + labelNames = append(labelNames, labelName) + } + sort.Strings(labelNames) + + r.valueBuf.WriteByte(model.SeparatorByte) + for _, labelName := range labelNames { + r.valueBuf.WriteString(labels[labelName]) + r.valueBuf.WriteByte(model.SeparatorByte) + + r.nameBuf.WriteString(labelName) + r.nameBuf.WriteByte(model.SeparatorByte) + } + + lh := labelHash{} + r.hasher.Write(r.nameBuf.Bytes()) + lh.names = nameHash(r.hasher.Sum64()) + + // Now add the values to the names we've already hashed. + r.hasher.Write(r.valueBuf.Bytes()) + lh.values = valueHash(r.hasher.Sum64()) + + return lh, labelNames +}