mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-11-26 17:21:01 +00:00
TTL expiration tests, README update
Signed-off-by: Ivan Mikheykin <ivan.mikheykin@flant.com>
This commit is contained in:
parent
57495db281
commit
d1b2dd47a8
6 changed files with 302 additions and 16 deletions
13
README.md
13
README.md
|
@ -268,6 +268,7 @@ defaults:
|
||||||
buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ]
|
buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ]
|
||||||
match_type: glob
|
match_type: glob
|
||||||
glob_disable_ordering: false
|
glob_disable_ordering: false
|
||||||
|
ttl: 0 # metrics not expired
|
||||||
mappings:
|
mappings:
|
||||||
# This will be a histogram using the buckets set in `defaults`.
|
# This will be a histogram using the buckets set in `defaults`.
|
||||||
- match: test.timing.*.*.*
|
- match: test.timing.*.*.*
|
||||||
|
@ -350,6 +351,18 @@ mappings:
|
||||||
|
|
||||||
Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.
|
Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.
|
||||||
|
|
||||||
|
### Time series expiration
|
||||||
|
|
||||||
|
`ttl` parameter can be used to define expiration time for stale metrics.
|
||||||
|
Value is a time duration with valid time units: "ns", "us" (or "µs"),
|
||||||
|
"ms", "s", "m", "h". For example, `ttl: 1m20s`. `0` value is used to indicate
|
||||||
|
not expired metrics.
|
||||||
|
|
||||||
|
Expiration is useful to gather metrics with changing label values from
|
||||||
|
dynamic environments like Kubernetes. For example, metric
|
||||||
|
`ingress_nginx_upstream_retries_count` with label `pod` and
|
||||||
|
changing pod name as a value for this label.
|
||||||
|
|
||||||
## Using Docker
|
## Using Docker
|
||||||
|
|
||||||
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.
|
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.
|
||||||
|
|
|
@ -265,7 +265,7 @@ type Events []Event
|
||||||
type LabelValues struct {
|
type LabelValues struct {
|
||||||
lastRegisteredAt time.Time
|
lastRegisteredAt time.Time
|
||||||
labels prometheus.Labels
|
labels prometheus.Labels
|
||||||
ttl uint64
|
ttl time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exporter struct {
|
type Exporter struct {
|
||||||
|
@ -315,6 +315,9 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
|
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
|
||||||
if mapping == nil {
|
if mapping == nil {
|
||||||
mapping = &mapper.MetricMapping{}
|
mapping = &mapper.MetricMapping{}
|
||||||
|
if b.mapper.Defaults.Ttl != 0 {
|
||||||
|
mapping.Ttl = b.mapper.Defaults.Ttl
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if mapping.Action == mapper.ActionTypeDrop {
|
if mapping.Action == mapper.ActionTypeDrop {
|
||||||
|
@ -443,7 +446,7 @@ func (b *Exporter) removeStaleMetrics() {
|
||||||
if lvs.ttl == 0 {
|
if lvs.ttl == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if lvs.lastRegisteredAt.Add(time.Duration(lvs.ttl) * time.Second).Before(now) {
|
if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) {
|
||||||
b.Counters.Delete(metricName, lvs.labels)
|
b.Counters.Delete(metricName, lvs.labels)
|
||||||
b.Gauges.Delete(metricName, lvs.labels)
|
b.Gauges.Delete(metricName, lvs.labels)
|
||||||
b.Summaries.Delete(metricName, lvs.labels)
|
b.Summaries.Delete(metricName, lvs.labels)
|
||||||
|
@ -455,7 +458,7 @@ func (b *Exporter) removeStaleMetrics() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time
|
// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time
|
||||||
func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl uint64) {
|
func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) {
|
||||||
_, hasMetric := b.labelValues[metricName]
|
_, hasMetric := b.labelValues[metricName]
|
||||||
if !hasMetric {
|
if !hasMetric {
|
||||||
b.labelValues[metricName] = make(map[uint64]*LabelValues)
|
b.labelValues[metricName] = make(map[uint64]*LabelValues)
|
||||||
|
|
225
exporter_test.go
225
exporter_test.go
|
@ -100,20 +100,15 @@ func TestHistogramUnits(t *testing.T) {
|
||||||
|
|
||||||
ex.Listen(events)
|
ex.Listen(events)
|
||||||
|
|
||||||
histogram, err := ex.Histograms.Get(name, prometheus.Labels{}, "", nil)
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Histogram not registered")
|
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
||||||
}
|
}
|
||||||
|
value := getFloat64(metrics, name, prometheus.Labels{})
|
||||||
// check the state of the histogram by
|
if *value == 300 {
|
||||||
// (ab)using its Write method (which is usually only used by Prometheus internally).
|
|
||||||
metric := &dto.Metric{}
|
|
||||||
histogram.Write(metric)
|
|
||||||
value := *metric.Histogram.SampleSum
|
|
||||||
if value == 300 {
|
|
||||||
t.Fatalf("Histogram observations not scaled into Seconds")
|
t.Fatalf("Histogram observations not scaled into Seconds")
|
||||||
} else if value != .300 {
|
} else if *value != .300 {
|
||||||
t.Fatalf("Received unexpected value for histogram observation %f != .300", value)
|
t.Fatalf("Received unexpected value for histogram observation %f != .300", *value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,3 +168,211 @@ func TestEscapeMetricName(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTtlExpiration validates expiration of time series.
|
||||||
|
// foobar metric without mapping should expire with default ttl of 1s
|
||||||
|
// bazqux metric should expire with ttl of 2s
|
||||||
|
func TestTtlExpiration(t *testing.T) {
|
||||||
|
config := `
|
||||||
|
defaults:
|
||||||
|
ttl: 1s
|
||||||
|
mappings:
|
||||||
|
- match: bazqux.*
|
||||||
|
name: bazqux
|
||||||
|
labels:
|
||||||
|
first: baz
|
||||||
|
second: qux
|
||||||
|
third: $1
|
||||||
|
ttl: 2s
|
||||||
|
`
|
||||||
|
|
||||||
|
bazquxLabels := prometheus.Labels{
|
||||||
|
"third": "main",
|
||||||
|
"first": "baz",
|
||||||
|
"second": "qux",
|
||||||
|
}
|
||||||
|
|
||||||
|
testMapper := &mapper.MetricMapper{}
|
||||||
|
err := testMapper.InitFromYAMLString(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Config load error: %s %s", config, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ex := NewExporter(testMapper)
|
||||||
|
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
||||||
|
events := make(chan Events, 2)
|
||||||
|
fatal := make(chan error, 1) // t.Fatal must not be called in goroutines (SA2002)
|
||||||
|
stop := make(chan bool, 1)
|
||||||
|
|
||||||
|
l.handlePacket([]byte("foobar:200|g"), events)
|
||||||
|
l.handlePacket([]byte("bazqux.main:42|ms"), events)
|
||||||
|
|
||||||
|
// Close channel to signify we are done with the listener after a short period.
|
||||||
|
go func() {
|
||||||
|
defer close(events)
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
|
||||||
|
var metrics []*dto.MetricFamily
|
||||||
|
var foobarValue *float64
|
||||||
|
var bazquxValue *float64
|
||||||
|
|
||||||
|
// Wait to gather both metrics
|
||||||
|
var tries = 7
|
||||||
|
for {
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
||||||
|
if foobarValue != nil && bazquxValue != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
tries--
|
||||||
|
if tries == 0 {
|
||||||
|
fatal <- fmt.Errorf("Gauge `foobar` and Summary `bazqux` should be gathered")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check values
|
||||||
|
if *foobarValue != 200 {
|
||||||
|
fatal <- fmt.Errorf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for expiration of foobar
|
||||||
|
tries = 20 // 20*100 = 2s
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
||||||
|
if foobarValue == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
tries--
|
||||||
|
if tries == 0 {
|
||||||
|
fatal <- fmt.Errorf("Gauge `foobar` should be expired")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for expiration of bazqux
|
||||||
|
tries = 20 // 20*100 = 2s
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
||||||
|
if bazquxValue == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if foobarValue != nil {
|
||||||
|
fatal <- fmt.Errorf("Gauge `foobar` should not be gathered after expiration")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tries--
|
||||||
|
if tries == 0 {
|
||||||
|
fatal <- fmt.Errorf("Summary `bazqux` should be expired")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ex.Listen(events)
|
||||||
|
stop <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-fatal:
|
||||||
|
t.Fatalf("%v", err)
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
|
||||||
|
// Method returns a value or nil if metric is not found.
|
||||||
|
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
|
||||||
|
var metricFamily *dto.MetricFamily
|
||||||
|
for _, m := range metrics {
|
||||||
|
if *m.Name == name {
|
||||||
|
metricFamily = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metricFamily == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var metric *dto.Metric
|
||||||
|
labelsHash := hashNameAndLabels(name, labels)
|
||||||
|
for _, m := range metricFamily.Metric {
|
||||||
|
h := hashNameAndLabels(name, labelPairsAsLabels(m.GetLabel()))
|
||||||
|
if h == labelsHash {
|
||||||
|
metric = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metric == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var value float64
|
||||||
|
if metric.Gauge != nil {
|
||||||
|
value = metric.Gauge.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Counter != nil {
|
||||||
|
value = metric.Counter.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Histogram != nil {
|
||||||
|
value = metric.Histogram.GetSampleSum()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Summary != nil {
|
||||||
|
value = metric.Summary.GetSampleSum()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
if metric.Untyped != nil {
|
||||||
|
value = metric.Untyped.GetValue()
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
|
||||||
|
labels = prometheus.Labels{}
|
||||||
|
for _, pair := range pairs {
|
||||||
|
if pair.Name == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
value := ""
|
||||||
|
if pair.Value != nil {
|
||||||
|
value = *pair.Value
|
||||||
|
}
|
||||||
|
labels[*pair.Name] = value
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -14,6 +14,7 @@ require (
|
||||||
github.com/onsi/gomega v1.4.3 // indirect
|
github.com/onsi/gomega v1.4.3 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_golang v0.9.2
|
github.com/prometheus/client_golang v0.9.2
|
||||||
|
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
|
||||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
||||||
github.com/sergi/go-diff v1.0.0 // indirect
|
github.com/sergi/go-diff v1.0.0 // indirect
|
||||||
github.com/sirupsen/logrus v1.0.3 // indirect
|
github.com/sirupsen/logrus v1.0.3 // indirect
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
|
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -39,7 +40,7 @@ type mapperConfigDefaults struct {
|
||||||
Quantiles []metricObjective `yaml:"quantiles"`
|
Quantiles []metricObjective `yaml:"quantiles"`
|
||||||
MatchType MatchType `yaml:"match_type"`
|
MatchType MatchType `yaml:"match_type"`
|
||||||
GlobDisableOrdering bool `yaml:"glob_disable_ordering"`
|
GlobDisableOrdering bool `yaml:"glob_disable_ordering"`
|
||||||
Ttl uint64 `yaml:"ttl"`
|
Ttl time.Duration `yaml:"ttl"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricMapper struct {
|
type MetricMapper struct {
|
||||||
|
@ -70,7 +71,7 @@ type MetricMapping struct {
|
||||||
HelpText string `yaml:"help"`
|
HelpText string `yaml:"help"`
|
||||||
Action ActionType `yaml:"action"`
|
Action ActionType `yaml:"action"`
|
||||||
MatchMetricType MetricType `yaml:"match_metric_type"`
|
MatchMetricType MetricType `yaml:"match_metric_type"`
|
||||||
Ttl uint64 `yaml:"ttl"`
|
Ttl time.Duration `yaml:"ttl"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricObjective struct {
|
type metricObjective struct {
|
||||||
|
|
|
@ -15,6 +15,7 @@ package mapper
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mappings map[string]struct {
|
type mappings map[string]struct {
|
||||||
|
@ -22,6 +23,7 @@ type mappings map[string]struct {
|
||||||
labels map[string]string
|
labels map[string]string
|
||||||
quantiles []metricObjective
|
quantiles []metricObjective
|
||||||
notPresent bool
|
notPresent bool
|
||||||
|
ttl time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetricMapperYAML(t *testing.T) {
|
func TestMetricMapperYAML(t *testing.T) {
|
||||||
|
@ -603,6 +605,66 @@ mappings:
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// Config that has a ttl.
|
||||||
|
{
|
||||||
|
config: `mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
ttl: 10s
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Second * 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Config that has a default ttl.
|
||||||
|
{
|
||||||
|
config: `defaults:
|
||||||
|
ttl: 1m2s
|
||||||
|
mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Minute + time.Second*2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Config that override a default ttl.
|
||||||
|
{
|
||||||
|
config: `defaults:
|
||||||
|
ttl: 1m2s
|
||||||
|
mappings:
|
||||||
|
- match: web.*
|
||||||
|
name: "web"
|
||||||
|
ttl: 5s
|
||||||
|
labels:
|
||||||
|
site: "$1"`,
|
||||||
|
mappings: mappings{
|
||||||
|
"test.a": {},
|
||||||
|
"web.localhost": {
|
||||||
|
name: "web",
|
||||||
|
labels: map[string]string{
|
||||||
|
"site": "localhost",
|
||||||
|
},
|
||||||
|
ttl: time.Second * 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
mapper := MetricMapper{}
|
mapper := MetricMapper{}
|
||||||
|
@ -633,6 +695,9 @@ mappings:
|
||||||
t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels)
|
t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if mapping.ttl > 0 && mapping.ttl != m.Ttl {
|
||||||
|
t.Fatalf("%d.%q: Expected ttl of %s, got %s", i, metric, mapping.ttl.String(), m.Ttl.String())
|
||||||
|
}
|
||||||
|
|
||||||
if len(mapping.quantiles) != 0 {
|
if len(mapping.quantiles) != 0 {
|
||||||
if len(mapping.quantiles) != len(m.Quantiles) {
|
if len(mapping.quantiles) != len(m.Quantiles) {
|
||||||
|
|
Loading…
Reference in a new issue