diff --git a/README.md b/README.md index 529fbb1..11177c7 100644 --- a/README.md +++ b/README.md @@ -268,6 +268,7 @@ defaults: buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ] match_type: glob glob_disable_ordering: false + ttl: 0 # metrics not expired mappings: # This will be a histogram using the buckets set in `defaults`. - match: test.timing.*.*.* @@ -350,6 +351,18 @@ mappings: Possible values for `match_metric_type` are `gauge`, `counter` and `timer`. +### Time series expiration + +`ttl` parameter can be used to define expiration time for stale metrics. +Value is a time duration with valid time units: "ns", "us" (or "µs"), +"ms", "s", "m", "h". For example, `ttl: 1m20s`. `0` value is used to indicate +not expired metrics. + +Expiration is useful to gather metrics with changing label values from +dynamic environments like Kubernetes. For example, metric +`ingress_nginx_upstream_retries_count` with label `pod` and +changing pod name as a value for this label. + ## Using Docker You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image. diff --git a/exporter.go b/exporter.go index b813590..6e10213 100644 --- a/exporter.go +++ b/exporter.go @@ -265,7 +265,7 @@ type Events []Event type LabelValues struct { lastRegisteredAt time.Time labels prometheus.Labels - ttl uint64 + ttl time.Duration } type Exporter struct { @@ -315,6 +315,9 @@ func (b *Exporter) handleEvent(event Event) { mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType()) if mapping == nil { mapping = &mapper.MetricMapping{} + if b.mapper.Defaults.Ttl != 0 { + mapping.Ttl = b.mapper.Defaults.Ttl + } } if mapping.Action == mapper.ActionTypeDrop { @@ -443,7 +446,7 @@ func (b *Exporter) removeStaleMetrics() { if lvs.ttl == 0 { continue } - if lvs.lastRegisteredAt.Add(time.Duration(lvs.ttl) * time.Second).Before(now) { + if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) { b.Counters.Delete(metricName, lvs.labels) b.Gauges.Delete(metricName, lvs.labels) b.Summaries.Delete(metricName, lvs.labels) @@ -455,7 +458,7 @@ func (b *Exporter) removeStaleMetrics() { } // saveLabelValues stores label values set to labelValues and update lastRegisteredAt time -func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl uint64) { +func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) { _, hasMetric := b.labelValues[metricName] if !hasMetric { b.labelValues[metricName] = make(map[uint64]*LabelValues) diff --git a/exporter_test.go b/exporter_test.go index 64f3206..eafe8de 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -100,20 +100,15 @@ func TestHistogramUnits(t *testing.T) { ex.Listen(events) - histogram, err := ex.Histograms.Get(name, prometheus.Labels{}, "", nil) + metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { - t.Fatalf("Histogram not registered") + t.Fatalf("Cannot gather from DefaultGatherer: %v", err) } - - // check the state of the histogram by - // (ab)using its Write method (which is usually only used by Prometheus internally). - metric := &dto.Metric{} - histogram.Write(metric) - value := *metric.Histogram.SampleSum - if value == 300 { + value := getFloat64(metrics, name, prometheus.Labels{}) + if *value == 300 { t.Fatalf("Histogram observations not scaled into Seconds") - } else if value != .300 { - t.Fatalf("Received unexpected value for histogram observation %f != .300", value) + } else if *value != .300 { + t.Fatalf("Received unexpected value for histogram observation %f != .300", *value) } } @@ -173,3 +168,211 @@ func TestEscapeMetricName(t *testing.T) { } } } + +// TestTtlExpiration validates expiration of time series. +// foobar metric without mapping should expire with default ttl of 1s +// bazqux metric should expire with ttl of 2s +func TestTtlExpiration(t *testing.T) { + config := ` +defaults: + ttl: 1s +mappings: +- match: bazqux.* + name: bazqux + labels: + first: baz + second: qux + third: $1 + ttl: 2s +` + + bazquxLabels := prometheus.Labels{ + "third": "main", + "first": "baz", + "second": "qux", + } + + testMapper := &mapper.MetricMapper{} + err := testMapper.InitFromYAMLString(config) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + + ex := NewExporter(testMapper) + for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { + events := make(chan Events, 2) + fatal := make(chan error, 1) // t.Fatal must not be called in goroutines (SA2002) + stop := make(chan bool, 1) + + l.handlePacket([]byte("foobar:200|g"), events) + l.handlePacket([]byte("bazqux.main:42|ms"), events) + + // Close channel to signify we are done with the listener after a short period. + go func() { + defer close(events) + + time.Sleep(time.Millisecond * 100) + + var metrics []*dto.MetricFamily + var foobarValue *float64 + var bazquxValue *float64 + + // Wait to gather both metrics + var tries = 7 + for { + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if foobarValue != nil && bazquxValue != nil { + break + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Gauge `foobar` and Summary `bazqux` should be gathered") + return + } + time.Sleep(time.Millisecond * 100) + } + + // Check values + if *foobarValue != 200 { + fatal <- fmt.Errorf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) + return + } + if *bazquxValue != 42 { + fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + return + } + + // Wait for expiration of foobar + tries = 20 // 20*100 = 2s + for { + time.Sleep(time.Millisecond * 100) + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if foobarValue == nil { + break + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Gauge `foobar` should be expired") + return + } + } + + if *bazquxValue != 42 { + fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + return + } + + // Wait for expiration of bazqux + tries = 20 // 20*100 = 2s + for { + time.Sleep(time.Millisecond * 100) + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if bazquxValue == nil { + break + } + if foobarValue != nil { + fatal <- fmt.Errorf("Gauge `foobar` should not be gathered after expiration") + return + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Summary `bazqux` should be expired") + return + } + } + }() + + go func() { + ex.Listen(events) + stop <- true + }() + + for { + select { + case err := <-fatal: + t.Fatalf("%v", err) + case <-stop: + return + } + } + + } +} + +// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels. +// Method returns a value or nil if metric is not found. +func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { + var metricFamily *dto.MetricFamily + for _, m := range metrics { + if *m.Name == name { + metricFamily = m + break + } + } + if metricFamily == nil { + return nil + } + + var metric *dto.Metric + labelsHash := hashNameAndLabels(name, labels) + for _, m := range metricFamily.Metric { + h := hashNameAndLabels(name, labelPairsAsLabels(m.GetLabel())) + if h == labelsHash { + metric = m + break + } + } + if metric == nil { + return nil + } + + var value float64 + if metric.Gauge != nil { + value = metric.Gauge.GetValue() + return &value + } + if metric.Counter != nil { + value = metric.Counter.GetValue() + return &value + } + if metric.Histogram != nil { + value = metric.Histogram.GetSampleSum() + return &value + } + if metric.Summary != nil { + value = metric.Summary.GetSampleSum() + return &value + } + if metric.Untyped != nil { + value = metric.Untyped.GetValue() + return &value + } + panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric)) +} + +func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) { + labels = prometheus.Labels{} + for _, pair := range pairs { + if pair.Name == nil { + continue + } + value := "" + if pair.Value != nil { + value = *pair.Value + } + labels[*pair.Name] = value + } + return +} diff --git a/go.mod b/go.mod index b69800f..488a957 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/onsi/gomega v1.4.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v0.9.2 + github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/sergi/go-diff v1.0.0 // indirect github.com/sirupsen/logrus v1.0.3 // indirect diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 621f5a2..b9fe3d7 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper/fsm" yaml "gopkg.in/yaml.v2" + "time" ) var ( @@ -39,7 +40,7 @@ type mapperConfigDefaults struct { Quantiles []metricObjective `yaml:"quantiles"` MatchType MatchType `yaml:"match_type"` GlobDisableOrdering bool `yaml:"glob_disable_ordering"` - Ttl uint64 `yaml:"ttl"` + Ttl time.Duration `yaml:"ttl"` } type MetricMapper struct { @@ -70,7 +71,7 @@ type MetricMapping struct { HelpText string `yaml:"help"` Action ActionType `yaml:"action"` MatchMetricType MetricType `yaml:"match_metric_type"` - Ttl uint64 `yaml:"ttl"` + Ttl time.Duration `yaml:"ttl"` } type metricObjective struct { diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 08e4306..8d1ea49 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -15,6 +15,7 @@ package mapper import ( "testing" + "time" ) type mappings map[string]struct { @@ -22,6 +23,7 @@ type mappings map[string]struct { labels map[string]string quantiles []metricObjective notPresent bool + ttl time.Duration } func TestMetricMapperYAML(t *testing.T) { @@ -603,6 +605,66 @@ mappings: }, }, }, + // Config that has a ttl. + { + config: `mappings: +- match: web.* + name: "web" + ttl: 10s + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Second * 10, + }, + }, + }, + // Config that has a default ttl. + { + config: `defaults: + ttl: 1m2s +mappings: +- match: web.* + name: "web" + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Minute + time.Second*2, + }, + }, + }, + // Config that override a default ttl. + { + config: `defaults: + ttl: 1m2s +mappings: +- match: web.* + name: "web" + ttl: 5s + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Second * 5, + }, + }, + }, } mapper := MetricMapper{} @@ -633,6 +695,9 @@ mappings: t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels) } } + if mapping.ttl > 0 && mapping.ttl != m.Ttl { + t.Fatalf("%d.%q: Expected ttl of %s, got %s", i, metric, mapping.ttl.String(), m.Ttl.String()) + } if len(mapping.quantiles) != 0 { if len(mapping.quantiles) != len(m.Quantiles) {