Remove stale timeseries

- ttl is hardcoded — should be in mapping.yaml
- works with metrics without labels

Signed-off-by: Ivan Mikheykin <ivan.mikheykin@flant.com>
This commit is contained in:
Ivan Mikheykin 2018-11-27 15:53:45 +03:00
parent b638b9d808
commit b4e29c5f18

View file

@ -15,17 +15,22 @@ package main
import ( import (
"bufio" "bufio"
"bytes"
"encoding/binary"
"fmt" "fmt"
"hash/fnv"
"io" "io"
"net" "net"
"regexp" "regexp"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
"unicode/utf8" "unicode/utf8"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
) )
@ -37,8 +42,15 @@ const (
"consider the effects on your monitoring setup. Error: %s" "consider the effects on your monitoring setup. Error: %s"
) )
// TODO move to mapping config
const metricTtl = time.Duration(5 * time.Second)
var ( var (
illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
hash = fnv.New64a()
strBuf bytes.Buffer // Used for hashing.
intBuf = make([]byte, 8)
) )
func labelsNames(labels prometheus.Labels) []string { func labelsNames(labels prometheus.Labels) []string {
@ -50,6 +62,21 @@ func labelsNames(labels prometheus.Labels) []string {
return names return names
} }
// hashNameAndLabels returns a hash value of the provided name string and all
// the label names and values in the provided labels map.
//
// Not safe for concurrent use! (Uses a shared buffer and hasher to save on
// allocations.)
func hashNameAndLabels(name string, labels prometheus.Labels) uint64 {
hash.Reset()
strBuf.Reset()
strBuf.WriteString(name)
hash.Write(strBuf.Bytes())
binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels))
hash.Write(intBuf)
return hash.Sum64()
}
type CounterContainer struct { type CounterContainer struct {
// metric name // metric name
Elements map[string]*prometheus.CounterVec Elements map[string]*prometheus.CounterVec
@ -76,6 +103,12 @@ func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help
return counterVec.GetMetricWith(labels) return counterVec.GetMetricWith(labels)
} }
func (c *CounterContainer) Delete(metricName string, labels prometheus.Labels) {
if _, ok := c.Elements[metricName]; ok {
c.Elements[metricName].Delete(labels)
}
}
type GaugeContainer struct { type GaugeContainer struct {
Elements map[string]*prometheus.GaugeVec Elements map[string]*prometheus.GaugeVec
} }
@ -101,6 +134,12 @@ func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help s
return gaugeVec.GetMetricWith(labels) return gaugeVec.GetMetricWith(labels)
} }
func (c *GaugeContainer) Delete(metricName string, labels prometheus.Labels) {
if _, ok := c.Elements[metricName]; ok {
c.Elements[metricName].Delete(labels)
}
}
type SummaryContainer struct { type SummaryContainer struct {
Elements map[string]*prometheus.SummaryVec Elements map[string]*prometheus.SummaryVec
mapper *mapper.MetricMapper mapper *mapper.MetricMapper
@ -138,6 +177,12 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help
return summaryVec.GetMetricWith(labels) return summaryVec.GetMetricWith(labels)
} }
func (c *SummaryContainer) Delete(metricName string, labels prometheus.Labels) {
if _, ok := c.Elements[metricName]; ok {
c.Elements[metricName].Delete(labels)
}
}
type HistogramContainer struct { type HistogramContainer struct {
Elements map[string]*prometheus.HistogramVec Elements map[string]*prometheus.HistogramVec
mapper *mapper.MetricMapper mapper *mapper.MetricMapper
@ -171,6 +216,12 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he
return histogramVec.GetMetricWith(labels) return histogramVec.GetMetricWith(labels)
} }
func (c *HistogramContainer) Delete(metricName string, labels prometheus.Labels) {
if _, ok := c.Elements[metricName]; ok {
c.Elements[metricName].Delete(labels)
}
}
type Event interface { type Event interface {
MetricName() string MetricName() string
Value() float64 Value() float64
@ -214,12 +265,18 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi
type Events []Event type Events []Event
type LabelValues struct {
lastRegisteredAt time.Time
labels prometheus.Labels
}
type Exporter struct { type Exporter struct {
Counters *CounterContainer Counters *CounterContainer
Gauges *GaugeContainer Gauges *GaugeContainer
Summaries *SummaryContainer Summaries *SummaryContainer
Histograms *HistogramContainer Histograms *HistogramContainer
mapper *mapper.MetricMapper mapper *mapper.MetricMapper
labelValues map[string]map[uint64]*LabelValues
} }
func escapeMetricName(metricName string) string { func escapeMetricName(metricName string) string {
@ -236,14 +293,21 @@ func escapeMetricName(metricName string) string {
// Listen handles all events sent to the given channel sequentially. It // Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed. // terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan Events) { func (b *Exporter) Listen(e <-chan Events) {
removeStaleMetricsTicker := time.NewTicker(time.Second)
for { for {
events, ok := <-e select {
if !ok { case <-removeStaleMetricsTicker.C:
log.Debug("Channel is closed. Break out of Exporter.Listener.") b.removeStaleMetrics()
return case events, ok := <-e:
} if !ok {
for _, event := range events { log.Debug("Channel is closed. Break out of Exporter.Listener.")
b.handleEvent(event) removeStaleMetricsTicker.Stop()
return
}
for _, event := range events {
b.handleEvent(event)
}
} }
} }
} }
@ -293,6 +357,7 @@ func (b *Exporter) handleEvent(event Event) {
) )
if err == nil { if err == nil {
counter.Add(event.Value()) counter.Add(event.Value())
b.saveLabelValues(metricName, prometheusLabels)
eventStats.WithLabelValues("counter").Inc() eventStats.WithLabelValues("counter").Inc()
} else { } else {
@ -313,6 +378,7 @@ func (b *Exporter) handleEvent(event Event) {
} else { } else {
gauge.Set(event.Value()) gauge.Set(event.Value())
} }
b.saveLabelValues(metricName, prometheusLabels)
eventStats.WithLabelValues("gauge").Inc() eventStats.WithLabelValues("gauge").Inc()
} else { } else {
@ -339,6 +405,7 @@ func (b *Exporter) handleEvent(event Event) {
) )
if err == nil { if err == nil {
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
b.saveLabelValues(metricName, prometheusLabels)
eventStats.WithLabelValues("timer").Inc() eventStats.WithLabelValues("timer").Inc()
} else { } else {
log.Debugf(regErrF, metricName, err) log.Debugf(regErrF, metricName, err)
@ -354,6 +421,7 @@ func (b *Exporter) handleEvent(event Event) {
) )
if err == nil { if err == nil {
summary.Observe(event.Value()) summary.Observe(event.Value())
b.saveLabelValues(metricName, prometheusLabels)
eventStats.WithLabelValues("timer").Inc() eventStats.WithLabelValues("timer").Inc()
} else { } else {
log.Debugf(regErrF, metricName, err) log.Debugf(regErrF, metricName, err)
@ -370,13 +438,48 @@ func (b *Exporter) handleEvent(event Event) {
} }
} }
// removeStaleMetrics removes label values set from metric with stale values
func (b *Exporter) removeStaleMetrics() {
now := time.Now()
// delete timeseries with expired ttl
for metricName := range b.labelValues {
for hash, lvs := range b.labelValues[metricName] {
if lvs.lastRegisteredAt.Add(metricTtl).Before(now) {
b.Counters.Delete(metricName, lvs.labels)
b.Gauges.Delete(metricName, lvs.labels)
b.Summaries.Delete(metricName, lvs.labels)
b.Histograms.Delete(metricName, lvs.labels)
delete(b.labelValues[metricName], hash)
}
}
}
}
// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time
func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels) {
_, hasMetric := b.labelValues[metricName]
if !hasMetric {
b.labelValues[metricName] = make(map[uint64]*LabelValues)
}
hash := hashNameAndLabels(metricName, labels)
_, ok := b.labelValues[metricName][hash]
if !ok {
b.labelValues[metricName][hash] = &LabelValues{
labels: labels,
}
}
now := time.Now()
b.labelValues[metricName][hash].lastRegisteredAt = now
}
func NewExporter(mapper *mapper.MetricMapper) *Exporter { func NewExporter(mapper *mapper.MetricMapper) *Exporter {
return &Exporter{ return &Exporter{
Counters: NewCounterContainer(), Counters: NewCounterContainer(),
Gauges: NewGaugeContainer(), Gauges: NewGaugeContainer(),
Summaries: NewSummaryContainer(mapper), Summaries: NewSummaryContainer(mapper),
Histograms: NewHistogramContainer(mapper), Histograms: NewHistogramContainer(mapper),
mapper: mapper, mapper: mapper,
labelValues: make(map[string]map[uint64]*LabelValues, 0),
} }
} }