diff --git a/exporter.go b/exporter.go index ad53ffe..6ec1d6c 100644 --- a/exporter.go +++ b/exporter.go @@ -15,17 +15,22 @@ package main import ( "bufio" + "bytes" + "encoding/binary" "fmt" + "hash/fnv" "io" "net" "regexp" "sort" "strconv" "strings" + "time" "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" + "github.com/prometheus/common/model" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -37,8 +42,15 @@ const ( "consider the effects on your monitoring setup. Error: %s" ) +// TODO move to mapping config +const metricTtl = time.Duration(5 * time.Second) + var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) + + hash = fnv.New64a() + strBuf bytes.Buffer // Used for hashing. + intBuf = make([]byte, 8) ) func labelsNames(labels prometheus.Labels) []string { @@ -50,6 +62,21 @@ func labelsNames(labels prometheus.Labels) []string { return names } +// hashNameAndLabels returns a hash value of the provided name string and all +// the label names and values in the provided labels map. +// +// Not safe for concurrent use! (Uses a shared buffer and hasher to save on +// allocations.) +func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { + hash.Reset() + strBuf.Reset() + strBuf.WriteString(name) + hash.Write(strBuf.Bytes()) + binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) + hash.Write(intBuf) + return hash.Sum64() +} + type CounterContainer struct { // metric name Elements map[string]*prometheus.CounterVec @@ -76,6 +103,12 @@ func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help return counterVec.GetMetricWith(labels) } +func (c *CounterContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type GaugeContainer struct { Elements map[string]*prometheus.GaugeVec } @@ -101,6 +134,12 @@ func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help s return gaugeVec.GetMetricWith(labels) } +func (c *GaugeContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type SummaryContainer struct { Elements map[string]*prometheus.SummaryVec mapper *mapper.MetricMapper @@ -138,6 +177,12 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help return summaryVec.GetMetricWith(labels) } +func (c *SummaryContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type HistogramContainer struct { Elements map[string]*prometheus.HistogramVec mapper *mapper.MetricMapper @@ -171,6 +216,12 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he return histogramVec.GetMetricWith(labels) } +func (c *HistogramContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type Event interface { MetricName() string Value() float64 @@ -214,12 +265,18 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi type Events []Event +type LabelValues struct { + lastRegisteredAt time.Time + labels prometheus.Labels +} + type Exporter struct { - Counters *CounterContainer - Gauges *GaugeContainer - Summaries *SummaryContainer - Histograms *HistogramContainer - mapper *mapper.MetricMapper + Counters *CounterContainer + Gauges *GaugeContainer + Summaries *SummaryContainer + Histograms *HistogramContainer + mapper *mapper.MetricMapper + labelValues map[string]map[uint64]*LabelValues } func escapeMetricName(metricName string) string { @@ -236,14 +293,21 @@ func escapeMetricName(metricName string) string { // Listen handles all events sent to the given channel sequentially. It // terminates when the channel is closed. func (b *Exporter) Listen(e <-chan Events) { + removeStaleMetricsTicker := time.NewTicker(time.Second) + for { - events, ok := <-e - if !ok { - log.Debug("Channel is closed. Break out of Exporter.Listener.") - return - } - for _, event := range events { - b.handleEvent(event) + select { + case <-removeStaleMetricsTicker.C: + b.removeStaleMetrics() + case events, ok := <-e: + if !ok { + log.Debug("Channel is closed. Break out of Exporter.Listener.") + removeStaleMetricsTicker.Stop() + return + } + for _, event := range events { + b.handleEvent(event) + } } } } @@ -293,6 +357,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { counter.Add(event.Value()) + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("counter").Inc() } else { @@ -313,6 +378,7 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("gauge").Inc() } else { @@ -339,6 +405,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -354,6 +421,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { summary.Observe(event.Value()) + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -370,13 +438,48 @@ func (b *Exporter) handleEvent(event Event) { } } +// removeStaleMetrics removes label values set from metric with stale values +func (b *Exporter) removeStaleMetrics() { + now := time.Now() + // delete timeseries with expired ttl + for metricName := range b.labelValues { + for hash, lvs := range b.labelValues[metricName] { + if lvs.lastRegisteredAt.Add(metricTtl).Before(now) { + b.Counters.Delete(metricName, lvs.labels) + b.Gauges.Delete(metricName, lvs.labels) + b.Summaries.Delete(metricName, lvs.labels) + b.Histograms.Delete(metricName, lvs.labels) + delete(b.labelValues[metricName], hash) + } + } + } +} + +// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time +func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels) { + _, hasMetric := b.labelValues[metricName] + if !hasMetric { + b.labelValues[metricName] = make(map[uint64]*LabelValues) + } + hash := hashNameAndLabels(metricName, labels) + _, ok := b.labelValues[metricName][hash] + if !ok { + b.labelValues[metricName][hash] = &LabelValues{ + labels: labels, + } + } + now := time.Now() + b.labelValues[metricName][hash].lastRegisteredAt = now +} + func NewExporter(mapper *mapper.MetricMapper) *Exporter { return &Exporter{ - Counters: NewCounterContainer(), - Gauges: NewGaugeContainer(), - Summaries: NewSummaryContainer(mapper), - Histograms: NewHistogramContainer(mapper), - mapper: mapper, + Counters: NewCounterContainer(), + Gauges: NewGaugeContainer(), + Summaries: NewSummaryContainer(mapper), + Histograms: NewHistogramContainer(mapper), + mapper: mapper, + labelValues: make(map[string]map[uint64]*LabelValues, 0), } }