mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-11-26 09:11:01 +00:00
Merge pull request #164 from flant/remove_stale_metrics
Remove stale metrics
This commit is contained in:
commit
7364c6fe44
7 changed files with 508 additions and 121 deletions
13
README.md
13
README.md
|
@ -268,6 +268,7 @@ defaults:
|
||||||
buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ]
|
buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ]
|
||||||
match_type: glob
|
match_type: glob
|
||||||
glob_disable_ordering: false
|
glob_disable_ordering: false
|
||||||
|
ttl: 0 # metrics do not expire
|
||||||
mappings:
|
mappings:
|
||||||
# This will be a histogram using the buckets set in `defaults`.
|
# This will be a histogram using the buckets set in `defaults`.
|
||||||
- match: test.timing.*.*.*
|
- match: test.timing.*.*.*
|
||||||
|
@ -350,6 +351,18 @@ mappings:
|
||||||
|
|
||||||
Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.
|
Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.
|
||||||
|
|
||||||
|
### Time series expiration
|
||||||
|
|
||||||
|
The `ttl` parameter can be used to define the expiration time for stale metrics.
|
||||||
|
The value is a time duration with valid time units: "ns", "us" (or "µs"),
|
||||||
|
"ms", "s", "m", "h". For example, `ttl: 1m20s`. `0` value is used to indicate
|
||||||
|
metrics that do not expire.
|
||||||
|
|
||||||
|
TTLs are applied to each mapped metric name/labels combination whenever
|
||||||
|
new samples are received. This means that you cannot immediately expire a
|
||||||
|
metric only by changing the mapping configuration. At least one sample must
|
||||||
|
be received for updated mappings to take effect.
|
||||||
|
|
||||||
## Using Docker
|
## Using Docker
|
||||||
|
|
||||||
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.
|
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.
|
||||||
|
|
233
exporter.go
233
exporter.go
|
@ -22,14 +22,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -48,6 +51,15 @@ var (
|
||||||
intBuf = make([]byte, 8)
|
intBuf = make([]byte, 8)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func labelNames(labels prometheus.Labels) []string {
|
||||||
|
names := make([]string, 0, len(labels))
|
||||||
|
for labelName := range labels {
|
||||||
|
names = append(names, labelName)
|
||||||
|
}
|
||||||
|
sort.Strings(names)
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
// hashNameAndLabels returns a hash value of the provided name string and all
|
// hashNameAndLabels returns a hash value of the provided name string and all
|
||||||
// the label names and values in the provided labels map.
|
// the label names and values in the provided labels map.
|
||||||
//
|
//
|
||||||
|
@ -64,74 +76,82 @@ func hashNameAndLabels(name string, labels prometheus.Labels) uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CounterContainer struct {
|
type CounterContainer struct {
|
||||||
Elements map[uint64]prometheus.Counter
|
// metric name
|
||||||
|
Elements map[string]*prometheus.CounterVec
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCounterContainer() *CounterContainer {
|
func NewCounterContainer() *CounterContainer {
|
||||||
return &CounterContainer{
|
return &CounterContainer{
|
||||||
Elements: make(map[uint64]prometheus.Counter),
|
Elements: make(map[string]*prometheus.CounterVec),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Counter, error) {
|
func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Counter, error) {
|
||||||
hash := hashNameAndLabels(metricName, labels)
|
counterVec, ok := c.Elements[metricName]
|
||||||
counter, ok := c.Elements[hash]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
counter = prometheus.NewCounter(prometheus.CounterOpts{
|
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Name: metricName,
|
Name: metricName,
|
||||||
Help: help,
|
Help: help,
|
||||||
ConstLabels: labels,
|
}, labelNames(labels))
|
||||||
})
|
if err := prometheus.Register(counterVec); err != nil {
|
||||||
if err := prometheus.Register(counter); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.Elements[hash] = counter
|
c.Elements[metricName] = counterVec
|
||||||
|
}
|
||||||
|
return counterVec.GetMetricWith(labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CounterContainer) Delete(metricName string, labels prometheus.Labels) {
|
||||||
|
if _, ok := c.Elements[metricName]; ok {
|
||||||
|
c.Elements[metricName].Delete(labels)
|
||||||
}
|
}
|
||||||
return counter, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type GaugeContainer struct {
|
type GaugeContainer struct {
|
||||||
Elements map[uint64]prometheus.Gauge
|
Elements map[string]*prometheus.GaugeVec
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGaugeContainer() *GaugeContainer {
|
func NewGaugeContainer() *GaugeContainer {
|
||||||
return &GaugeContainer{
|
return &GaugeContainer{
|
||||||
Elements: make(map[uint64]prometheus.Gauge),
|
Elements: make(map[string]*prometheus.GaugeVec),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Gauge, error) {
|
func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Gauge, error) {
|
||||||
hash := hashNameAndLabels(metricName, labels)
|
gaugeVec, ok := c.Elements[metricName]
|
||||||
gauge, ok := c.Elements[hash]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
gauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: metricName,
|
Name: metricName,
|
||||||
Help: help,
|
Help: help,
|
||||||
ConstLabels: labels,
|
}, labelNames(labels))
|
||||||
})
|
if err := prometheus.Register(gaugeVec); err != nil {
|
||||||
if err := prometheus.Register(gauge); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.Elements[hash] = gauge
|
c.Elements[metricName] = gaugeVec
|
||||||
|
}
|
||||||
|
return gaugeVec.GetMetricWith(labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *GaugeContainer) Delete(metricName string, labels prometheus.Labels) {
|
||||||
|
if _, ok := c.Elements[metricName]; ok {
|
||||||
|
c.Elements[metricName].Delete(labels)
|
||||||
}
|
}
|
||||||
return gauge, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SummaryContainer struct {
|
type SummaryContainer struct {
|
||||||
Elements map[uint64]prometheus.Summary
|
Elements map[string]*prometheus.SummaryVec
|
||||||
mapper *mapper.MetricMapper
|
mapper *mapper.MetricMapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer {
|
func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer {
|
||||||
return &SummaryContainer{
|
return &SummaryContainer{
|
||||||
Elements: make(map[uint64]prometheus.Summary),
|
Elements: make(map[string]*prometheus.SummaryVec),
|
||||||
mapper: mapper,
|
mapper: mapper,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Summary, error) {
|
func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
hash := hashNameAndLabels(metricName, labels)
|
summaryVec, ok := c.Elements[metricName]
|
||||||
summary, ok := c.Elements[hash]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
quantiles := c.mapper.Defaults.Quantiles
|
quantiles := c.mapper.Defaults.Quantiles
|
||||||
if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 {
|
if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 {
|
||||||
|
@ -141,54 +161,63 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help
|
||||||
for _, q := range quantiles {
|
for _, q := range quantiles {
|
||||||
objectives[q.Quantile] = q.Error
|
objectives[q.Quantile] = q.Error
|
||||||
}
|
}
|
||||||
summary = prometheus.NewSummary(
|
summaryVec = prometheus.NewSummaryVec(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Name: metricName,
|
Name: metricName,
|
||||||
Help: help,
|
Help: help,
|
||||||
ConstLabels: labels,
|
Objectives: objectives,
|
||||||
Objectives: objectives,
|
}, labelNames(labels))
|
||||||
})
|
if err := prometheus.Register(summaryVec); err != nil {
|
||||||
if err := prometheus.Register(summary); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.Elements[hash] = summary
|
c.Elements[metricName] = summaryVec
|
||||||
|
}
|
||||||
|
return summaryVec.GetMetricWith(labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SummaryContainer) Delete(metricName string, labels prometheus.Labels) {
|
||||||
|
if _, ok := c.Elements[metricName]; ok {
|
||||||
|
c.Elements[metricName].Delete(labels)
|
||||||
}
|
}
|
||||||
return summary, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type HistogramContainer struct {
|
type HistogramContainer struct {
|
||||||
Elements map[uint64]prometheus.Histogram
|
Elements map[string]*prometheus.HistogramVec
|
||||||
mapper *mapper.MetricMapper
|
mapper *mapper.MetricMapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer {
|
func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer {
|
||||||
return &HistogramContainer{
|
return &HistogramContainer{
|
||||||
Elements: make(map[uint64]prometheus.Histogram),
|
Elements: make(map[string]*prometheus.HistogramVec),
|
||||||
mapper: mapper,
|
mapper: mapper,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Histogram, error) {
|
func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
|
||||||
hash := hashNameAndLabels(metricName, labels)
|
histogramVec, ok := c.Elements[metricName]
|
||||||
histogram, ok := c.Elements[hash]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
buckets := c.mapper.Defaults.Buckets
|
buckets := c.mapper.Defaults.Buckets
|
||||||
if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 {
|
if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 {
|
||||||
buckets = mapping.Buckets
|
buckets = mapping.Buckets
|
||||||
}
|
}
|
||||||
histogram = prometheus.NewHistogram(
|
histogramVec = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
Name: metricName,
|
Name: metricName,
|
||||||
Help: help,
|
Help: help,
|
||||||
ConstLabels: labels,
|
Buckets: buckets,
|
||||||
Buckets: buckets,
|
}, labelNames(labels))
|
||||||
})
|
if err := prometheus.Register(histogramVec); err != nil {
|
||||||
c.Elements[hash] = histogram
|
|
||||||
if err := prometheus.Register(histogram); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
c.Elements[metricName] = histogramVec
|
||||||
|
}
|
||||||
|
return histogramVec.GetMetricWith(labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *HistogramContainer) Delete(metricName string, labels prometheus.Labels) {
|
||||||
|
if _, ok := c.Elements[metricName]; ok {
|
||||||
|
c.Elements[metricName].Delete(labels)
|
||||||
}
|
}
|
||||||
return histogram, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Event interface {
|
type Event interface {
|
||||||
|
@ -234,12 +263,19 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi
|
||||||
|
|
||||||
type Events []Event
|
type Events []Event
|
||||||
|
|
||||||
|
type LabelValues struct {
|
||||||
|
lastRegisteredAt time.Time
|
||||||
|
labels prometheus.Labels
|
||||||
|
ttl time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
type Exporter struct {
|
type Exporter struct {
|
||||||
Counters *CounterContainer
|
Counters *CounterContainer
|
||||||
Gauges *GaugeContainer
|
Gauges *GaugeContainer
|
||||||
Summaries *SummaryContainer
|
Summaries *SummaryContainer
|
||||||
Histograms *HistogramContainer
|
Histograms *HistogramContainer
|
||||||
mapper *mapper.MetricMapper
|
mapper *mapper.MetricMapper
|
||||||
|
labelValues map[string]map[uint64]*LabelValues
|
||||||
}
|
}
|
||||||
|
|
||||||
func escapeMetricName(metricName string) string {
|
func escapeMetricName(metricName string) string {
|
||||||
|
@ -256,14 +292,21 @@ func escapeMetricName(metricName string) string {
|
||||||
// Listen handles all events sent to the given channel sequentially. It
|
// Listen handles all events sent to the given channel sequentially. It
|
||||||
// terminates when the channel is closed.
|
// terminates when the channel is closed.
|
||||||
func (b *Exporter) Listen(e <-chan Events) {
|
func (b *Exporter) Listen(e <-chan Events) {
|
||||||
|
removeStaleMetricsTicker := clock.NewTicker(time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
events, ok := <-e
|
select {
|
||||||
if !ok {
|
case <-removeStaleMetricsTicker.C:
|
||||||
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
b.removeStaleMetrics()
|
||||||
return
|
case events, ok := <-e:
|
||||||
}
|
if !ok {
|
||||||
for _, event := range events {
|
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
||||||
b.handleEvent(event)
|
removeStaleMetricsTicker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, event := range events {
|
||||||
|
b.handleEvent(event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -273,6 +316,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
|
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
|
||||||
if mapping == nil {
|
if mapping == nil {
|
||||||
mapping = &mapper.MetricMapping{}
|
mapping = &mapper.MetricMapping{}
|
||||||
|
if b.mapper.Defaults.Ttl != 0 {
|
||||||
|
mapping.Ttl = b.mapper.Defaults.Ttl
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if mapping.Action == mapper.ActionTypeDrop {
|
if mapping.Action == mapper.ActionTypeDrop {
|
||||||
|
@ -313,7 +359,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
)
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
counter.Add(event.Value())
|
counter.Add(event.Value())
|
||||||
|
b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl)
|
||||||
eventStats.WithLabelValues("counter").Inc()
|
eventStats.WithLabelValues("counter").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -333,7 +379,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
} else {
|
} else {
|
||||||
gauge.Set(event.Value())
|
gauge.Set(event.Value())
|
||||||
}
|
}
|
||||||
|
b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl)
|
||||||
eventStats.WithLabelValues("gauge").Inc()
|
eventStats.WithLabelValues("gauge").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -359,6 +405,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
)
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
||||||
|
b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl)
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -374,6 +421,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
)
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
summary.Observe(event.Value())
|
summary.Observe(event.Value())
|
||||||
|
b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl)
|
||||||
eventStats.WithLabelValues("timer").Inc()
|
eventStats.WithLabelValues("timer").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Debugf(regErrF, metricName, err)
|
log.Debugf(regErrF, metricName, err)
|
||||||
|
@ -390,13 +438,56 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeStaleMetrics removes label values set from metric with stale values
|
||||||
|
func (b *Exporter) removeStaleMetrics() {
|
||||||
|
now := clock.Now()
|
||||||
|
// delete timeseries with expired ttl
|
||||||
|
for metricName := range b.labelValues {
|
||||||
|
for hash, lvs := range b.labelValues[metricName] {
|
||||||
|
if lvs.ttl == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) {
|
||||||
|
b.Counters.Delete(metricName, lvs.labels)
|
||||||
|
b.Gauges.Delete(metricName, lvs.labels)
|
||||||
|
b.Summaries.Delete(metricName, lvs.labels)
|
||||||
|
b.Histograms.Delete(metricName, lvs.labels)
|
||||||
|
delete(b.labelValues[metricName], hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value
|
||||||
|
func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) {
|
||||||
|
metric, hasMetric := b.labelValues[metricName]
|
||||||
|
if !hasMetric {
|
||||||
|
metric = make(map[uint64]*LabelValues)
|
||||||
|
b.labelValues[metricName] = metric
|
||||||
|
}
|
||||||
|
hash := hashNameAndLabels(metricName, labels)
|
||||||
|
metricLabelValues, ok := metric[hash]
|
||||||
|
if !ok {
|
||||||
|
metricLabelValues = &LabelValues{
|
||||||
|
labels: labels,
|
||||||
|
ttl: ttl,
|
||||||
|
}
|
||||||
|
b.labelValues[metricName][hash] = metricLabelValues
|
||||||
|
}
|
||||||
|
now := clock.Now()
|
||||||
|
metricLabelValues.lastRegisteredAt = now
|
||||||
|
// Update ttl from mapping
|
||||||
|
metricLabelValues.ttl = ttl
|
||||||
|
}
|
||||||
|
|
||||||
func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
||||||
return &Exporter{
|
return &Exporter{
|
||||||
Counters: NewCounterContainer(),
|
Counters: NewCounterContainer(),
|
||||||
Gauges: NewGaugeContainer(),
|
Gauges: NewGaugeContainer(),
|
||||||
Summaries: NewSummaryContainer(mapper),
|
Summaries: NewSummaryContainer(mapper),
|
||||||
Histograms: NewHistogramContainer(mapper),
|
Histograms: NewHistogramContainer(mapper),
|
||||||
mapper: mapper,
|
mapper: mapper,
|
||||||
|
labelValues: make(map[string]map[uint64]*LabelValues, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
282
exporter_test.go
282
exporter_test.go
|
@ -20,7 +20,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,22 +40,19 @@ func TestNegativeCounter(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
events := make(chan Events, 1)
|
events := make(chan Events, 0)
|
||||||
c := Events{
|
|
||||||
&CounterEvent{
|
|
||||||
metricName: "foo",
|
|
||||||
value: -1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
events <- c
|
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(time.Millisecond * 100)
|
c := Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: "foo",
|
||||||
|
value: -1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events <- c
|
||||||
close(events)
|
close(events)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
ex.Listen(events)
|
ex.Listen(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,34 +61,37 @@ func TestNegativeCounter(t *testing.T) {
|
||||||
// It sends the same tags first with a valid value, then with an invalid one.
|
// It sends the same tags first with a valid value, then with an invalid one.
|
||||||
// The exporter should not panic, but drop the invalid event
|
// The exporter should not panic, but drop the invalid event
|
||||||
func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if e := recover(); e != nil {
|
||||||
|
err := e.(error)
|
||||||
|
t.Fatalf("Exporter listener should not panic on bad utf8: %q", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
||||||
|
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
|
||||||
|
}
|
||||||
|
close(events)
|
||||||
|
}()
|
||||||
|
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
ex.Listen(events)
|
||||||
events := make(chan Events, 2)
|
|
||||||
|
|
||||||
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
|
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
close(events)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ex.Listen(events)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type MockHistogram struct {
|
|
||||||
prometheus.Metric
|
|
||||||
prometheus.Collector
|
|
||||||
value float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *MockHistogram) Observe(n float64) {
|
|
||||||
h.value = n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHistogramUnits(t *testing.T) {
|
func TestHistogramUnits(t *testing.T) {
|
||||||
events := make(chan Events, 1)
|
// Start exporter with a synchronous channel
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
go func() {
|
||||||
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
|
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
|
||||||
|
ex.Listen(events)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Synchronously send a statsd event to wait for handleEvent execution.
|
||||||
|
// Then close events channel to stop a listener.
|
||||||
name := "foo"
|
name := "foo"
|
||||||
c := Events{
|
c := Events{
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
|
@ -98,22 +100,22 @@ func TestHistogramUnits(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
events <- c
|
events <- c
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
events <- Events{}
|
||||||
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
|
close(events)
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
// Check histogram value
|
||||||
go func() {
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
time.Sleep(time.Millisecond * 100)
|
if err != nil {
|
||||||
close(events)
|
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
||||||
}()
|
}
|
||||||
mock := &MockHistogram{}
|
value := getFloat64(metrics, name, prometheus.Labels{})
|
||||||
key := hashNameAndLabels(name, nil)
|
if value == nil {
|
||||||
ex.Histograms.Elements[key] = mock
|
t.Fatal("Histogram value should not be nil")
|
||||||
ex.Listen(events)
|
}
|
||||||
if mock.value == 300 {
|
if *value == 300 {
|
||||||
t.Fatalf("Histogram observations not scaled into Seconds")
|
t.Fatalf("Histogram observations not scaled into Seconds")
|
||||||
} else if mock.value != .300 {
|
} else if *value != .300 {
|
||||||
t.Fatalf("Received unexpected value for histogram observation %f != .300", mock.value)
|
t.Fatalf("Received unexpected value for histogram observation %f != .300", *value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,3 +175,183 @@ func TestEscapeMetricName(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTtlExpiration validates expiration of time series.
|
||||||
|
// foobar metric without mapping should expire with default ttl of 1s
|
||||||
|
// bazqux metric should expire with ttl of 2s
|
||||||
|
func TestTtlExpiration(t *testing.T) {
|
||||||
|
// Mock a time.NewTicker
|
||||||
|
tickerCh := make(chan time.Time, 0)
|
||||||
|
clock.ClockInstance = &clock.Clock{
|
||||||
|
TickerCh: tickerCh,
|
||||||
|
}
|
||||||
|
|
||||||
|
config := `
|
||||||
|
defaults:
|
||||||
|
ttl: 1s
|
||||||
|
mappings:
|
||||||
|
- match: bazqux.*
|
||||||
|
name: bazqux
|
||||||
|
ttl: 2s
|
||||||
|
`
|
||||||
|
// Create mapper from config and start an Exporter with a synchronous channel
|
||||||
|
testMapper := &mapper.MetricMapper{}
|
||||||
|
err := testMapper.InitFromYAMLString(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Config load error: %s %s", config, err)
|
||||||
|
}
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
defer close(events)
|
||||||
|
go func() {
|
||||||
|
ex := NewExporter(testMapper)
|
||||||
|
ex.Listen(events)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ev := Events{
|
||||||
|
// event with default ttl = 1s
|
||||||
|
&GaugeEvent{
|
||||||
|
metricName: "foobar",
|
||||||
|
value: 200,
|
||||||
|
},
|
||||||
|
// event with ttl = 2s from a mapping
|
||||||
|
&TimerEvent{
|
||||||
|
metricName: "bazqux.main",
|
||||||
|
value: 42,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var metrics []*dto.MetricFamily
|
||||||
|
var foobarValue *float64
|
||||||
|
var bazquxValue *float64
|
||||||
|
|
||||||
|
// Step 1. Send events with statsd metrics.
|
||||||
|
// Send empty Events to wait for events are handled.
|
||||||
|
// saveLabelValues will use fake instant as a lastRegisteredAt time.
|
||||||
|
clock.ClockInstance.Instant = time.Unix(0, 0)
|
||||||
|
events <- ev
|
||||||
|
events <- Events{}
|
||||||
|
|
||||||
|
// Check values
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if foobarValue == nil || bazquxValue == nil {
|
||||||
|
t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered")
|
||||||
|
}
|
||||||
|
if *foobarValue != 200 {
|
||||||
|
t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
|
||||||
|
}
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2. Increase Instant to emulate metrics expiration after 1s
|
||||||
|
clock.ClockInstance.Instant = time.Unix(1, 10)
|
||||||
|
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
|
||||||
|
events <- Events{}
|
||||||
|
|
||||||
|
// Check values
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if foobarValue != nil {
|
||||||
|
t.Fatalf("Gauge `foobar` should be expired")
|
||||||
|
}
|
||||||
|
if bazquxValue == nil {
|
||||||
|
t.Fatalf("Summary `bazqux` should be gathered")
|
||||||
|
}
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3. Increase Instant to emulate metrics expiration after 2s
|
||||||
|
clock.ClockInstance.Instant = time.Unix(2, 200)
|
||||||
|
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
|
||||||
|
events <- Events{}
|
||||||
|
|
||||||
|
// Check values
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if bazquxValue != nil {
|
||||||
|
t.Fatalf("Summary `bazqux` should be expired")
|
||||||
|
}
|
||||||
|
if foobarValue != nil {
|
||||||
|
t.Fatalf("Gauge `foobar` should not be gathered after expiration")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
|
||||||
|
// Method returns a value or nil if metric is not found.
|
||||||
|
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
|
||||||
|
var metricFamily *dto.MetricFamily
|
||||||
|
for _, m := range metrics {
|
||||||
|
if *m.Name == name {
|
||||||
|
metricFamily = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metricFamily == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var metric *dto.Metric
|
||||||
|
labelsHash := hashNameAndLabels(name, labels)
|
||||||
|
for _, m := range metricFamily.Metric {
|
||||||
|
h := hashNameAndLabels(name, labelPairsAsLabels(m.GetLabel()))
|
||||||
|
if h == labelsHash {
|
||||||
|
metric = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metric == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var value float64
|
||||||
|
if metric.Gauge != nil {
|
||||||
|
value = metric.Gauge.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Counter != nil {
|
||||||
|
value = metric.Counter.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Histogram != nil {
|
||||||
|
value = metric.Histogram.GetSampleSum()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Summary != nil {
|
||||||
|
value = metric.Summary.GetSampleSum()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Untyped != nil {
|
||||||
|
value = metric.Untyped.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
|
||||||
|
labels = prometheus.Labels{}
|
||||||
|
for _, pair := range pairs {
|
||||||
|
if pair.Name == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
value := ""
|
||||||
|
if pair.Value != nil {
|
||||||
|
value = *pair.Value
|
||||||
|
}
|
||||||
|
labels[*pair.Name] = value
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -14,6 +14,7 @@ require (
|
||||||
github.com/onsi/gomega v1.4.3 // indirect
|
github.com/onsi/gomega v1.4.3 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_golang v0.9.2
|
github.com/prometheus/client_golang v0.9.2
|
||||||
|
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
|
||||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
||||||
github.com/sergi/go-diff v1.0.0 // indirect
|
github.com/sergi/go-diff v1.0.0 // indirect
|
||||||
github.com/sirupsen/logrus v1.0.3 // indirect
|
github.com/sirupsen/logrus v1.0.3 // indirect
|
||||||
|
|
28
pkg/clock/clock.go
Normal file
28
pkg/clock/clock.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package clock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ClockInstance *Clock
|
||||||
|
|
||||||
|
type Clock struct {
|
||||||
|
Instant time.Time
|
||||||
|
TickerCh chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func Now() time.Time {
|
||||||
|
if ClockInstance == nil {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
return ClockInstance.Instant
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTicker(d time.Duration) *time.Ticker {
|
||||||
|
if ClockInstance == nil || ClockInstance.TickerCh == nil {
|
||||||
|
return time.NewTicker(d)
|
||||||
|
}
|
||||||
|
return &time.Ticker{
|
||||||
|
C: ClockInstance.TickerCh,
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
|
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -39,6 +40,7 @@ type mapperConfigDefaults struct {
|
||||||
Quantiles []metricObjective `yaml:"quantiles"`
|
Quantiles []metricObjective `yaml:"quantiles"`
|
||||||
MatchType MatchType `yaml:"match_type"`
|
MatchType MatchType `yaml:"match_type"`
|
||||||
GlobDisableOrdering bool `yaml:"glob_disable_ordering"`
|
GlobDisableOrdering bool `yaml:"glob_disable_ordering"`
|
||||||
|
Ttl time.Duration `yaml:"ttl"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricMapper struct {
|
type MetricMapper struct {
|
||||||
|
@ -69,6 +71,7 @@ type MetricMapping struct {
|
||||||
HelpText string `yaml:"help"`
|
HelpText string `yaml:"help"`
|
||||||
Action ActionType `yaml:"action"`
|
Action ActionType `yaml:"action"`
|
||||||
MatchMetricType MetricType `yaml:"match_metric_type"`
|
MatchMetricType MetricType `yaml:"match_metric_type"`
|
||||||
|
Ttl time.Duration `yaml:"ttl"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricObjective struct {
|
type metricObjective struct {
|
||||||
|
@ -177,6 +180,10 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
|
||||||
currentMapping.Quantiles = n.Defaults.Quantiles
|
currentMapping.Quantiles = n.Defaults.Quantiles
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if currentMapping.Ttl == 0 && n.Defaults.Ttl > 0 {
|
||||||
|
currentMapping.Ttl = n.Defaults.Ttl
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
|
|
|
@ -15,6 +15,7 @@ package mapper
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mappings map[string]struct {
|
type mappings map[string]struct {
|
||||||
|
@ -22,6 +23,7 @@ type mappings map[string]struct {
|
||||||
labels map[string]string
|
labels map[string]string
|
||||||
quantiles []metricObjective
|
quantiles []metricObjective
|
||||||
notPresent bool
|
notPresent bool
|
||||||
|
ttl time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetricMapperYAML(t *testing.T) {
|
func TestMetricMapperYAML(t *testing.T) {
|
||||||
|
@ -603,6 +605,66 @@ mappings:
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// Config that has a ttl.
|
||||||
|
{
|
||||||
|
config: `mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
ttl: 10s
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Second * 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Config that has a default ttl.
|
||||||
|
{
|
||||||
|
config: `defaults:
|
||||||
|
ttl: 1m2s
|
||||||
|
mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Minute + time.Second*2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Config that override a default ttl.
|
||||||
|
{
|
||||||
|
config: `defaults:
|
||||||
|
ttl: 1m2s
|
||||||
|
mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
ttl: 5s
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Second * 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
mapper := MetricMapper{}
|
mapper := MetricMapper{}
|
||||||
|
@ -633,6 +695,9 @@ mappings:
|
||||||
t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels)
|
t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if mapping.ttl > 0 && mapping.ttl != m.Ttl {
|
||||||
|
t.Fatalf("%d.%q: Expected ttl of %s, got %s", i, metric, mapping.ttl.String(), m.Ttl.String())
|
||||||
|
}
|
||||||
|
|
||||||
if len(mapping.quantiles) != 0 {
|
if len(mapping.quantiles) != 0 {
|
||||||
if len(mapping.quantiles) != len(m.Quantiles) {
|
if len(mapping.quantiles) != len(m.Quantiles) {
|
||||||
|
|
Loading…
Reference in a new issue