forked from mirrors/statsd_exporter
Rework metric registration and tracking
This reworks/rewrites the way that metric registration and tracking is handled across all of statsd_exporter. The goal here is to reduce memory and cpu usage, but also to reduce complexity by unifying metric registration with the ttl tracking for expiration. Some high level notes: * Previously metric names and labels were being hashed three times for every event accepted: in the container code, the save label set code and again in the prometheus client libraries. This unifies the first two and caches the results of `GetMetricWith` to avoid the third. * This optimizes the label hashing to reduce cpu overhead and memory allocations. The label hashing code previously showed up high on all profiling done for both CPU and memory allocations Using the BenchmarkExporterListener benchmark, the improvement looks like this. Before: cpu: 11,341,797 ns/op memory allocated: 1,731,119 B/op memory allocations: 58,028 allocs/op After: cpu: 7,084,651 ns/op memory allocated: 906,556 B/op memory allocations: 42,026 allocs/op Signed-off-by: Clayton O'Neill <claytono@github.com>
This commit is contained in:
parent
7a107f899f
commit
7c30120dbc
3 changed files with 428 additions and 51 deletions
59
exporter.go
59
exporter.go
|
@ -40,7 +40,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hash = fnv.New64a()
|
fnvHash = fnv.New64a()
|
||||||
strBuf bytes.Buffer // Used for hashing.
|
strBuf bytes.Buffer // Used for hashing.
|
||||||
intBuf = make([]byte, 8)
|
intBuf = make([]byte, 8)
|
||||||
)
|
)
|
||||||
|
@ -56,15 +56,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
|
||||||
u.c.Collect(c)
|
u.c.Collect(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
CounterMetricType metricType = iota
|
|
||||||
GaugeMetricType
|
|
||||||
SummaryMetricType
|
|
||||||
HistogramMetricType
|
|
||||||
)
|
|
||||||
|
|
||||||
type metricChecker interface {
|
type metricChecker interface {
|
||||||
metricConflicts(string, metricType) bool
|
metricConflicts(string, metricType) bool
|
||||||
}
|
}
|
||||||
|
@ -88,7 +79,7 @@ func getContainerMapKey(metricName string, labelNames []string) string {
|
||||||
// Not safe for concurrent use! (Uses a shared buffer and hasher to save on
|
// Not safe for concurrent use! (Uses a shared buffer and hasher to save on
|
||||||
// allocations.)
|
// allocations.)
|
||||||
func hashNameAndLabels(name string, labelNames []string, labels prometheus.Labels) uint64 {
|
func hashNameAndLabels(name string, labelNames []string, labels prometheus.Labels) uint64 {
|
||||||
hash.Reset()
|
fnvHash.Reset()
|
||||||
strBuf.Reset()
|
strBuf.Reset()
|
||||||
strBuf.WriteString(name)
|
strBuf.WriteString(name)
|
||||||
strBuf.WriteByte(model.SeparatorByte)
|
strBuf.WriteByte(model.SeparatorByte)
|
||||||
|
@ -100,8 +91,8 @@ func hashNameAndLabels(name string, labelNames []string, labels prometheus.Label
|
||||||
strBuf.WriteByte(model.SeparatorByte)
|
strBuf.WriteByte(model.SeparatorByte)
|
||||||
}
|
}
|
||||||
|
|
||||||
hash.Write(strBuf.Bytes())
|
fnvHash.Write(strBuf.Bytes())
|
||||||
return hash.Sum64()
|
return fnvHash.Sum64()
|
||||||
}
|
}
|
||||||
|
|
||||||
type CounterContainer struct {
|
type CounterContainer struct {
|
||||||
|
@ -356,6 +347,7 @@ type Exporter struct {
|
||||||
Summaries *SummaryContainer
|
Summaries *SummaryContainer
|
||||||
Histograms *HistogramContainer
|
Histograms *HistogramContainer
|
||||||
mapper *mapper.MetricMapper
|
mapper *mapper.MetricMapper
|
||||||
|
registry *registry
|
||||||
labelValues map[string]map[uint64]*LabelValues
|
labelValues map[string]map[uint64]*LabelValues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,6 +415,7 @@ func (b *Exporter) Listen(e <-chan Events) {
|
||||||
select {
|
select {
|
||||||
case <-removeStaleMetricsTicker.C:
|
case <-removeStaleMetricsTicker.C:
|
||||||
b.removeStaleMetrics()
|
b.removeStaleMetrics()
|
||||||
|
b.registry.removeStaleMetrics()
|
||||||
case events, ok := <-e:
|
case events, ok := <-e:
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
||||||
|
@ -474,7 +467,6 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
metricName = escapeMetricName(event.MetricName())
|
metricName = escapeMetricName(event.MetricName())
|
||||||
}
|
}
|
||||||
|
|
||||||
sortedLabelNames := getSortedLabelNames(prometheusLabels)
|
|
||||||
switch ev := event.(type) {
|
switch ev := event.(type) {
|
||||||
case *CounterEvent:
|
case *CounterEvent:
|
||||||
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
||||||
|
@ -485,16 +477,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
counter, err := b.Counters.Get(
|
counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
counter.Add(event.Value())
|
counter.Add(event.Value())
|
||||||
b.saveLabelValues(metricName, CounterMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("counter").Inc()
|
eventStats.WithLabelValues("counter").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -502,13 +487,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case *GaugeEvent:
|
case *GaugeEvent:
|
||||||
gauge, err := b.Gauges.Get(
|
gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
)
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if ev.relative {
|
if ev.relative {
|
||||||
|
@ -516,7 +495,6 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
} else {
|
} else {
|
||||||
gauge.Set(event.Value())
|
gauge.Set(event.Value())
|
||||||
}
|
}
|
||||||
b.saveLabelValues(metricName, GaugeMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("gauge").Inc()
|
eventStats.WithLabelValues("gauge").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -534,17 +512,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
|
|
||||||
switch t {
|
switch t {
|
||||||
case mapper.TimerTypeHistogram:
|
case mapper.TimerTypeHistogram:
|
||||||
histogram, err := b.Histograms.Get(
|
histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
mapping,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
||||||
b.saveLabelValues(metricName, HistogramMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -552,17 +522,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
|
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
|
||||||
summary, err := b.Summaries.Get(
|
summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping)
|
||||||
metricName,
|
|
||||||
sortedLabelNames,
|
|
||||||
prometheusLabels,
|
|
||||||
b,
|
|
||||||
help,
|
|
||||||
mapping,
|
|
||||||
)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
||||||
b.saveLabelValues(metricName, SummaryMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl)
|
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -653,6 +615,7 @@ func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
||||||
Summaries: NewSummaryContainer(mapper),
|
Summaries: NewSummaryContainer(mapper),
|
||||||
Histograms: NewHistogramContainer(mapper),
|
Histograms: NewHistogramContainer(mapper),
|
||||||
mapper: mapper,
|
mapper: mapper,
|
||||||
|
registry: newRegistry(mapper),
|
||||||
labelValues: make(map[string]map[uint64]*LabelValues),
|
labelValues: make(map[string]map[uint64]*LabelValues),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -531,7 +531,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
|
||||||
|
|
||||||
metrics, err := prometheus.DefaultGatherer.Gather()
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Gather should not fail")
|
t.Fatal("Gather should not fail: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var metricFamily *dto.MetricFamily
|
var metricFamily *dto.MetricFamily
|
||||||
|
|
414
registry.go
Normal file
414
registry.go
Normal file
|
@ -0,0 +1,414 @@
|
||||||
|
// Copyright 2013 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"hash/fnv"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
|
)
|
||||||
|
|
||||||
|
type metricType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CounterMetricType metricType = iota
|
||||||
|
GaugeMetricType
|
||||||
|
SummaryMetricType
|
||||||
|
HistogramMetricType
|
||||||
|
)
|
||||||
|
|
||||||
|
type nameHash uint64
|
||||||
|
type valueHash uint64
|
||||||
|
type labelHash struct {
|
||||||
|
// This is a hash over the label names
|
||||||
|
names nameHash
|
||||||
|
// This is a hash over the label names + label values
|
||||||
|
values valueHash
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricHolder interface{}
|
||||||
|
|
||||||
|
type registeredMetric struct {
|
||||||
|
lastRegisteredAt time.Time
|
||||||
|
labels prometheus.Labels
|
||||||
|
ttl time.Duration
|
||||||
|
metric metricHolder
|
||||||
|
vecKey nameHash
|
||||||
|
}
|
||||||
|
|
||||||
|
type vectorHolder interface {
|
||||||
|
Delete(label prometheus.Labels) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type vector struct {
|
||||||
|
holder vectorHolder
|
||||||
|
refCount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type metric struct {
|
||||||
|
metricType metricType
|
||||||
|
// Vectors key is the hash of the label names
|
||||||
|
vectors map[nameHash]*vector
|
||||||
|
// Metrics key is a hash of the label names + label values
|
||||||
|
metrics map[valueHash]*registeredMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
type registry struct {
|
||||||
|
metrics map[string]metric
|
||||||
|
mapper *mapper.MetricMapper
|
||||||
|
// The below value and label variables are allocated in the registry struct
|
||||||
|
// so that we don't have to allocate them every time have to compute a label
|
||||||
|
// hash.
|
||||||
|
valueBuf, nameBuf bytes.Buffer
|
||||||
|
valueHasher, nameHasher hash.Hash64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRegistry(mapper *mapper.MetricMapper) *registry {
|
||||||
|
return ®istry{
|
||||||
|
metrics: make(map[string]metric),
|
||||||
|
mapper: mapper,
|
||||||
|
valueHasher: fnv.New64a(),
|
||||||
|
nameHasher: fnv.New64a(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) metricConflicts(metricName string, metricType metricType) bool {
|
||||||
|
vector, hasMetric := r.metrics[metricName]
|
||||||
|
if !hasMetric {
|
||||||
|
// No metric with this name exists
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if vector.metricType == metricType {
|
||||||
|
// We've found a copy of this metric with this type, but different
|
||||||
|
// labels, so it's safe to create a new one.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// The metric exists, but it's of a different type than we're trying to
|
||||||
|
// create.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) {
|
||||||
|
r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) {
|
||||||
|
metric, hasMetric := r.metrics[metricName]
|
||||||
|
if !hasMetric {
|
||||||
|
metric.metricType = metricType
|
||||||
|
metric.vectors = make(map[nameHash]*vector)
|
||||||
|
metric.metrics = make(map[valueHash]*registeredMetric)
|
||||||
|
|
||||||
|
r.metrics[metricName] = metric
|
||||||
|
}
|
||||||
|
|
||||||
|
v, ok := metric.vectors[hash.names]
|
||||||
|
if !ok {
|
||||||
|
v = &vector{holder: vh}
|
||||||
|
metric.vectors[hash.names] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
rm, ok := metric.metrics[hash.values]
|
||||||
|
if !ok {
|
||||||
|
rm = ®isteredMetric{
|
||||||
|
labels: labels,
|
||||||
|
ttl: ttl,
|
||||||
|
metric: mh,
|
||||||
|
vecKey: hash.names,
|
||||||
|
}
|
||||||
|
metric.metrics[hash.values] = rm
|
||||||
|
v.refCount++
|
||||||
|
}
|
||||||
|
now := clock.Now()
|
||||||
|
rm.lastRegisteredAt = now
|
||||||
|
// Update ttl from mapping
|
||||||
|
rm.ttl = ttl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) {
|
||||||
|
metric, hasMetric := r.metrics[metricName]
|
||||||
|
|
||||||
|
if !hasMetric {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if metric.metricType != metricType {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rm, ok := metric.metrics[hash.values]
|
||||||
|
if ok {
|
||||||
|
return metric.vectors[hash.names].holder, rm.metric
|
||||||
|
}
|
||||||
|
|
||||||
|
vector, ok := metric.vectors[hash.names]
|
||||||
|
if ok {
|
||||||
|
return vector.holder, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, CounterMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Counter), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, CounterMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var counterVec *prometheus.CounterVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("counter").Inc()
|
||||||
|
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
counterVec = vh.(*prometheus.CounterVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var counter prometheus.Counter
|
||||||
|
var err error
|
||||||
|
if counter, err = counterVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl)
|
||||||
|
|
||||||
|
return counter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, GaugeMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Gauge), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, GaugeMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gaugeVec *prometheus.GaugeVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("gauge").Inc()
|
||||||
|
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
gaugeVec = vh.(*prometheus.GaugeVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gauge prometheus.Gauge
|
||||||
|
var err error
|
||||||
|
if gauge, err = gaugeVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl)
|
||||||
|
|
||||||
|
return gauge, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, HistogramMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Observer), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_sum", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_count", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_bucket", HistogramMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var histogramVec *prometheus.HistogramVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("histogram").Inc()
|
||||||
|
buckets := r.mapper.Defaults.Buckets
|
||||||
|
if mapping.Buckets != nil && len(mapping.Buckets) > 0 {
|
||||||
|
buckets = mapping.Buckets
|
||||||
|
}
|
||||||
|
histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
Buckets: buckets,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
histogramVec = vh.(*prometheus.HistogramVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var observer prometheus.Observer
|
||||||
|
var err error
|
||||||
|
if observer, err = histogramVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl)
|
||||||
|
|
||||||
|
return observer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
|
hash, labelNames := r.hashLabels(labels)
|
||||||
|
vh, mh := r.get(metricName, hash, SummaryMetricType)
|
||||||
|
if mh != nil {
|
||||||
|
return mh.(prometheus.Observer), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.metricConflicts(metricName, SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_sum", SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
if r.metricConflicts(metricName+"_count", SummaryMetricType) {
|
||||||
|
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
var summaryVec *prometheus.SummaryVec
|
||||||
|
if vh == nil {
|
||||||
|
metricsCount.WithLabelValues("summary").Inc()
|
||||||
|
quantiles := r.mapper.Defaults.Quantiles
|
||||||
|
if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 {
|
||||||
|
quantiles = mapping.Quantiles
|
||||||
|
}
|
||||||
|
objectives := make(map[float64]float64)
|
||||||
|
for _, q := range quantiles {
|
||||||
|
objectives[q.Quantile] = q.Error
|
||||||
|
}
|
||||||
|
// In the case of no mapping file, explicitly define the default quantiles
|
||||||
|
if len(objectives) == 0 {
|
||||||
|
objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
|
||||||
|
}
|
||||||
|
summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||||
|
Name: metricName,
|
||||||
|
Help: help,
|
||||||
|
Objectives: objectives,
|
||||||
|
}, labelNames)
|
||||||
|
|
||||||
|
if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
summaryVec = vh.(*prometheus.SummaryVec)
|
||||||
|
}
|
||||||
|
|
||||||
|
var observer prometheus.Observer
|
||||||
|
var err error
|
||||||
|
if observer, err = summaryVec.GetMetricWith(labels); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl)
|
||||||
|
|
||||||
|
return observer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *registry) removeStaleMetrics() {
|
||||||
|
now := clock.Now()
|
||||||
|
// delete timeseries with expired ttl
|
||||||
|
for metricName, metric := range r.metrics {
|
||||||
|
for hash, rm := range metric.metrics {
|
||||||
|
if rm.ttl == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if rm.lastRegisteredAt.Add(rm.ttl).Before(now) {
|
||||||
|
metric.vectors[rm.vecKey].holder.Delete(rm.labels)
|
||||||
|
metric.vectors[rm.vecKey].refCount--
|
||||||
|
delete(metric.metrics, hash)
|
||||||
|
}
|
||||||
|
// If no individual instances exist, then delete the entire vector
|
||||||
|
if metric.vectors[rm.vecKey].refCount == 0 {
|
||||||
|
delete(r.metrics, metricName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculates a hash of both the label names and the label names + label values.
|
||||||
|
// Note that this function goes to silly lengths to avoid allocating memory.
|
||||||
|
// This is in the fast path for every metric ingested and allocating and gc'ing
|
||||||
|
// temporary memory is expensive.
|
||||||
|
func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) {
|
||||||
|
r.nameHasher.Reset()
|
||||||
|
r.valueHasher.Reset()
|
||||||
|
r.nameBuf.Reset()
|
||||||
|
r.valueBuf.Reset()
|
||||||
|
labelNames := make([]string, 0, len(labels))
|
||||||
|
|
||||||
|
for labelName := range labels {
|
||||||
|
labelNames = append(labelNames, labelName)
|
||||||
|
}
|
||||||
|
sort.Strings(labelNames)
|
||||||
|
|
||||||
|
r.valueBuf.WriteByte(model.SeparatorByte)
|
||||||
|
for _, labelName := range labelNames {
|
||||||
|
r.valueBuf.WriteString(labels[labelName])
|
||||||
|
r.valueBuf.WriteByte(model.SeparatorByte)
|
||||||
|
|
||||||
|
r.nameBuf.WriteString(labelName)
|
||||||
|
r.nameBuf.WriteByte(model.SeparatorByte)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.nameHasher.Write(r.nameBuf.Bytes())
|
||||||
|
r.valueHasher.Write(r.nameBuf.Bytes())
|
||||||
|
r.valueHasher.Write(r.valueBuf.Bytes())
|
||||||
|
|
||||||
|
return labelHash{
|
||||||
|
names: nameHash(r.nameHasher.Sum64()),
|
||||||
|
values: valueHash(r.valueHasher.Sum64())}, labelNames
|
||||||
|
}
|
Loading…
Reference in a new issue