From b638b9d808f01680aaa0ad4d528628ed315c7938 Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Tue, 27 Nov 2018 15:41:04 +0300 Subject: [PATCH 1/8] Replace Metrics with Collectors - use MetricVec family instead of Metric - dynamic label values instead of ConstLabels - use dto.Metric to gain histrogram value in exporter_test - remove hash calculations Signed-off-by: Ivan Mikheykin --- exporter.go | 122 ++++++++++++++++++++--------------------------- exporter_test.go | 32 ++++++------- 2 files changed, 67 insertions(+), 87 deletions(-) diff --git a/exporter.go b/exporter.go index b227516..ad53ffe 100644 --- a/exporter.go +++ b/exporter.go @@ -15,20 +15,17 @@ package main import ( "bufio" - "bytes" - "encoding/binary" "fmt" - "hash/fnv" "io" "net" "regexp" + "sort" "strconv" "strings" "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" - "github.com/prometheus/common/model" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -42,96 +39,82 @@ const ( var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) - - hash = fnv.New64a() - strBuf bytes.Buffer // Used for hashing. - intBuf = make([]byte, 8) ) -// hashNameAndLabels returns a hash value of the provided name string and all -// the label names and values in the provided labels map. -// -// Not safe for concurrent use! (Uses a shared buffer and hasher to save on -// allocations.) -func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { - hash.Reset() - strBuf.Reset() - strBuf.WriteString(name) - hash.Write(strBuf.Bytes()) - binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) - hash.Write(intBuf) - return hash.Sum64() +func labelsNames(labels prometheus.Labels) []string { + names := make([]string, 0, len(labels)) + for labelName := range labels { + names = append(names, labelName) + } + sort.Strings(names) + return names } type CounterContainer struct { - Elements map[uint64]prometheus.Counter + // metric name + Elements map[string]*prometheus.CounterVec } func NewCounterContainer() *CounterContainer { return &CounterContainer{ - Elements: make(map[uint64]prometheus.Counter), + Elements: make(map[string]*prometheus.CounterVec), } } func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Counter, error) { - hash := hashNameAndLabels(metricName, labels) - counter, ok := c.Elements[hash] + counterVec, ok := c.Elements[metricName] if !ok { - counter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: metricName, - Help: help, - ConstLabels: labels, - }) - if err := prometheus.Register(counter); err != nil { + counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: help, + }, labelsNames(labels)) + if err := prometheus.Register(counterVec); err != nil { return nil, err } - c.Elements[hash] = counter + c.Elements[metricName] = counterVec } - return counter, nil + return counterVec.GetMetricWith(labels) } type GaugeContainer struct { - Elements map[uint64]prometheus.Gauge + Elements map[string]*prometheus.GaugeVec } func NewGaugeContainer() *GaugeContainer { return &GaugeContainer{ - Elements: make(map[uint64]prometheus.Gauge), + Elements: make(map[string]*prometheus.GaugeVec), } } func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help string) (prometheus.Gauge, error) { - hash := hashNameAndLabels(metricName, labels) - gauge, ok := c.Elements[hash] + gaugeVec, ok := c.Elements[metricName] if !ok { - gauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: metricName, - Help: help, - ConstLabels: labels, - }) - if err := prometheus.Register(gauge); err != nil { + gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: metricName, + Help: help, + }, labelsNames(labels)) + if err := prometheus.Register(gaugeVec); err != nil { return nil, err } - c.Elements[hash] = gauge + c.Elements[metricName] = gaugeVec } - return gauge, nil + return gaugeVec.GetMetricWith(labels) } type SummaryContainer struct { - Elements map[uint64]prometheus.Summary + Elements map[string]*prometheus.SummaryVec mapper *mapper.MetricMapper } func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer { return &SummaryContainer{ - Elements: make(map[uint64]prometheus.Summary), + Elements: make(map[string]*prometheus.SummaryVec), mapper: mapper, } } func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Summary, error) { - hash := hashNameAndLabels(metricName, labels) - summary, ok := c.Elements[hash] + summaryVec, ok := c.Elements[metricName] if !ok { quantiles := c.mapper.Defaults.Quantiles if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 { @@ -141,54 +124,51 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help for _, q := range quantiles { objectives[q.Quantile] = q.Error } - summary = prometheus.NewSummary( + summaryVec = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Name: metricName, - Help: help, - ConstLabels: labels, - Objectives: objectives, - }) - if err := prometheus.Register(summary); err != nil { + Name: metricName, + Help: help, + Objectives: objectives, + }, labelsNames(labels)) + if err := prometheus.Register(summaryVec); err != nil { return nil, err } - c.Elements[hash] = summary + c.Elements[metricName] = summaryVec } - return summary, nil + return summaryVec.GetMetricWith(labels) } type HistogramContainer struct { - Elements map[uint64]prometheus.Histogram + Elements map[string]*prometheus.HistogramVec mapper *mapper.MetricMapper } func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer { return &HistogramContainer{ - Elements: make(map[uint64]prometheus.Histogram), + Elements: make(map[string]*prometheus.HistogramVec), mapper: mapper, } } func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Histogram, error) { - hash := hashNameAndLabels(metricName, labels) - histogram, ok := c.Elements[hash] + histogramVec, ok := c.Elements[metricName] if !ok { buckets := c.mapper.Defaults.Buckets if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 { buckets = mapping.Buckets } - histogram = prometheus.NewHistogram( + histogramVec := prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: metricName, - Help: help, - ConstLabels: labels, - Buckets: buckets, - }) - c.Elements[hash] = histogram - if err := prometheus.Register(histogram); err != nil { + Name: metricName, + Help: help, + Buckets: buckets, + }, labelsNames(labels)) + if err := prometheus.Register(histogramVec); err != nil { return nil, err } + c.Elements[metricName] = histogramVec } - return histogram, nil + return histogramVec.GetMetricWith(labels) } type Event interface { diff --git a/exporter_test.go b/exporter_test.go index 0722240..64f3206 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -78,16 +79,6 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { } } -type MockHistogram struct { - prometheus.Metric - prometheus.Collector - value float64 -} - -func (h *MockHistogram) Observe(n float64) { - h.value = n -} - func TestHistogramUnits(t *testing.T) { events := make(chan Events, 1) name := "foo" @@ -106,14 +97,23 @@ func TestHistogramUnits(t *testing.T) { time.Sleep(time.Millisecond * 100) close(events) }() - mock := &MockHistogram{} - key := hashNameAndLabels(name, nil) - ex.Histograms.Elements[key] = mock + ex.Listen(events) - if mock.value == 300 { + + histogram, err := ex.Histograms.Get(name, prometheus.Labels{}, "", nil) + if err != nil { + t.Fatalf("Histogram not registered") + } + + // check the state of the histogram by + // (ab)using its Write method (which is usually only used by Prometheus internally). + metric := &dto.Metric{} + histogram.Write(metric) + value := *metric.Histogram.SampleSum + if value == 300 { t.Fatalf("Histogram observations not scaled into Seconds") - } else if mock.value != .300 { - t.Fatalf("Received unexpected value for histogram observation %f != .300", mock.value) + } else if value != .300 { + t.Fatalf("Received unexpected value for histogram observation %f != .300", value) } } From b4e29c5f182866ec1020af63c3a6df9d721715df Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Tue, 27 Nov 2018 15:53:45 +0300 Subject: [PATCH 2/8] Remove stale timeseries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ttl is hardcoded — should be in mapping.yaml - works with metrics without labels Signed-off-by: Ivan Mikheykin --- exporter.go | 137 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 120 insertions(+), 17 deletions(-) diff --git a/exporter.go b/exporter.go index ad53ffe..6ec1d6c 100644 --- a/exporter.go +++ b/exporter.go @@ -15,17 +15,22 @@ package main import ( "bufio" + "bytes" + "encoding/binary" "fmt" + "hash/fnv" "io" "net" "regexp" "sort" "strconv" "strings" + "time" "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" + "github.com/prometheus/common/model" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -37,8 +42,15 @@ const ( "consider the effects on your monitoring setup. Error: %s" ) +// TODO move to mapping config +const metricTtl = time.Duration(5 * time.Second) + var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) + + hash = fnv.New64a() + strBuf bytes.Buffer // Used for hashing. + intBuf = make([]byte, 8) ) func labelsNames(labels prometheus.Labels) []string { @@ -50,6 +62,21 @@ func labelsNames(labels prometheus.Labels) []string { return names } +// hashNameAndLabels returns a hash value of the provided name string and all +// the label names and values in the provided labels map. +// +// Not safe for concurrent use! (Uses a shared buffer and hasher to save on +// allocations.) +func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { + hash.Reset() + strBuf.Reset() + strBuf.WriteString(name) + hash.Write(strBuf.Bytes()) + binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) + hash.Write(intBuf) + return hash.Sum64() +} + type CounterContainer struct { // metric name Elements map[string]*prometheus.CounterVec @@ -76,6 +103,12 @@ func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help return counterVec.GetMetricWith(labels) } +func (c *CounterContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type GaugeContainer struct { Elements map[string]*prometheus.GaugeVec } @@ -101,6 +134,12 @@ func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help s return gaugeVec.GetMetricWith(labels) } +func (c *GaugeContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type SummaryContainer struct { Elements map[string]*prometheus.SummaryVec mapper *mapper.MetricMapper @@ -138,6 +177,12 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help return summaryVec.GetMetricWith(labels) } +func (c *SummaryContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type HistogramContainer struct { Elements map[string]*prometheus.HistogramVec mapper *mapper.MetricMapper @@ -171,6 +216,12 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he return histogramVec.GetMetricWith(labels) } +func (c *HistogramContainer) Delete(metricName string, labels prometheus.Labels) { + if _, ok := c.Elements[metricName]; ok { + c.Elements[metricName].Delete(labels) + } +} + type Event interface { MetricName() string Value() float64 @@ -214,12 +265,18 @@ func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTi type Events []Event +type LabelValues struct { + lastRegisteredAt time.Time + labels prometheus.Labels +} + type Exporter struct { - Counters *CounterContainer - Gauges *GaugeContainer - Summaries *SummaryContainer - Histograms *HistogramContainer - mapper *mapper.MetricMapper + Counters *CounterContainer + Gauges *GaugeContainer + Summaries *SummaryContainer + Histograms *HistogramContainer + mapper *mapper.MetricMapper + labelValues map[string]map[uint64]*LabelValues } func escapeMetricName(metricName string) string { @@ -236,14 +293,21 @@ func escapeMetricName(metricName string) string { // Listen handles all events sent to the given channel sequentially. It // terminates when the channel is closed. func (b *Exporter) Listen(e <-chan Events) { + removeStaleMetricsTicker := time.NewTicker(time.Second) + for { - events, ok := <-e - if !ok { - log.Debug("Channel is closed. Break out of Exporter.Listener.") - return - } - for _, event := range events { - b.handleEvent(event) + select { + case <-removeStaleMetricsTicker.C: + b.removeStaleMetrics() + case events, ok := <-e: + if !ok { + log.Debug("Channel is closed. Break out of Exporter.Listener.") + removeStaleMetricsTicker.Stop() + return + } + for _, event := range events { + b.handleEvent(event) + } } } } @@ -293,6 +357,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { counter.Add(event.Value()) + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("counter").Inc() } else { @@ -313,6 +378,7 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("gauge").Inc() } else { @@ -339,6 +405,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -354,6 +421,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { summary.Observe(event.Value()) + b.saveLabelValues(metricName, prometheusLabels) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -370,13 +438,48 @@ func (b *Exporter) handleEvent(event Event) { } } +// removeStaleMetrics removes label values set from metric with stale values +func (b *Exporter) removeStaleMetrics() { + now := time.Now() + // delete timeseries with expired ttl + for metricName := range b.labelValues { + for hash, lvs := range b.labelValues[metricName] { + if lvs.lastRegisteredAt.Add(metricTtl).Before(now) { + b.Counters.Delete(metricName, lvs.labels) + b.Gauges.Delete(metricName, lvs.labels) + b.Summaries.Delete(metricName, lvs.labels) + b.Histograms.Delete(metricName, lvs.labels) + delete(b.labelValues[metricName], hash) + } + } + } +} + +// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time +func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels) { + _, hasMetric := b.labelValues[metricName] + if !hasMetric { + b.labelValues[metricName] = make(map[uint64]*LabelValues) + } + hash := hashNameAndLabels(metricName, labels) + _, ok := b.labelValues[metricName][hash] + if !ok { + b.labelValues[metricName][hash] = &LabelValues{ + labels: labels, + } + } + now := time.Now() + b.labelValues[metricName][hash].lastRegisteredAt = now +} + func NewExporter(mapper *mapper.MetricMapper) *Exporter { return &Exporter{ - Counters: NewCounterContainer(), - Gauges: NewGaugeContainer(), - Summaries: NewSummaryContainer(mapper), - Histograms: NewHistogramContainer(mapper), - mapper: mapper, + Counters: NewCounterContainer(), + Gauges: NewGaugeContainer(), + Summaries: NewSummaryContainer(mapper), + Histograms: NewHistogramContainer(mapper), + mapper: mapper, + labelValues: make(map[string]map[uint64]*LabelValues, 0), } } From e1a3a5fc32d370e65b144e7e64388feaaf347619 Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Tue, 27 Nov 2018 15:59:01 +0300 Subject: [PATCH 3/8] Configured ttl value for stale metrics Signed-off-by: Ivan Mikheykin --- exporter.go | 24 ++++++++++++------------ pkg/mapper/mapper.go | 6 ++++++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/exporter.go b/exporter.go index 6ec1d6c..676bb77 100644 --- a/exporter.go +++ b/exporter.go @@ -42,9 +42,6 @@ const ( "consider the effects on your monitoring setup. Error: %s" ) -// TODO move to mapping config -const metricTtl = time.Duration(5 * time.Second) - var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) @@ -202,7 +199,7 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 { buckets = mapping.Buckets } - histogramVec := prometheus.NewHistogramVec( + histogramVec = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: metricName, Help: help, @@ -268,6 +265,7 @@ type Events []Event type LabelValues struct { lastRegisteredAt time.Time labels prometheus.Labels + ttl uint64 } type Exporter struct { @@ -357,8 +355,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { counter.Add(event.Value()) - b.saveLabelValues(metricName, prometheusLabels) - + b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("counter").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -378,8 +375,7 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } - b.saveLabelValues(metricName, prometheusLabels) - + b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("gauge").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -405,7 +401,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - b.saveLabelValues(metricName, prometheusLabels) + b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -421,7 +417,7 @@ func (b *Exporter) handleEvent(event Event) { ) if err == nil { summary.Observe(event.Value()) - b.saveLabelValues(metricName, prometheusLabels) + b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -444,7 +440,10 @@ func (b *Exporter) removeStaleMetrics() { // delete timeseries with expired ttl for metricName := range b.labelValues { for hash, lvs := range b.labelValues[metricName] { - if lvs.lastRegisteredAt.Add(metricTtl).Before(now) { + if lvs.ttl == 0 { + continue + } + if lvs.lastRegisteredAt.Add(time.Duration(lvs.ttl) * time.Second).Before(now) { b.Counters.Delete(metricName, lvs.labels) b.Gauges.Delete(metricName, lvs.labels) b.Summaries.Delete(metricName, lvs.labels) @@ -456,7 +455,7 @@ func (b *Exporter) removeStaleMetrics() { } // saveLabelValues stores label values set to labelValues and update lastRegisteredAt time -func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels) { +func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl uint64) { _, hasMetric := b.labelValues[metricName] if !hasMetric { b.labelValues[metricName] = make(map[uint64]*LabelValues) @@ -466,6 +465,7 @@ func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels) if !ok { b.labelValues[metricName][hash] = &LabelValues{ labels: labels, + ttl: ttl, } } now := time.Now() diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 46e90e6..621f5a2 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -39,6 +39,7 @@ type mapperConfigDefaults struct { Quantiles []metricObjective `yaml:"quantiles"` MatchType MatchType `yaml:"match_type"` GlobDisableOrdering bool `yaml:"glob_disable_ordering"` + Ttl uint64 `yaml:"ttl"` } type MetricMapper struct { @@ -69,6 +70,7 @@ type MetricMapping struct { HelpText string `yaml:"help"` Action ActionType `yaml:"action"` MatchMetricType MetricType `yaml:"match_metric_type"` + Ttl uint64 `yaml:"ttl"` } type metricObjective struct { @@ -177,6 +179,10 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error { currentMapping.Quantiles = n.Defaults.Quantiles } + if currentMapping.Ttl == 0 && n.Defaults.Ttl > 0 { + currentMapping.Ttl = n.Defaults.Ttl + } + } m.mutex.Lock() From 57495db2816c0c3e659557b2496c0eec68089c30 Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Tue, 27 Nov 2018 16:36:15 +0300 Subject: [PATCH 4/8] rename labelsNames to labelNames Signed-off-by: Ivan Mikheykin --- exporter.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter.go b/exporter.go index 676bb77..b813590 100644 --- a/exporter.go +++ b/exporter.go @@ -50,7 +50,7 @@ var ( intBuf = make([]byte, 8) ) -func labelsNames(labels prometheus.Labels) []string { +func labelNames(labels prometheus.Labels) []string { names := make([]string, 0, len(labels)) for labelName := range labels { names = append(names, labelName) @@ -91,7 +91,7 @@ func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: metricName, Help: help, - }, labelsNames(labels)) + }, labelNames(labels)) if err := prometheus.Register(counterVec); err != nil { return nil, err } @@ -122,7 +122,7 @@ func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help s gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: metricName, Help: help, - }, labelsNames(labels)) + }, labelNames(labels)) if err := prometheus.Register(gaugeVec); err != nil { return nil, err } @@ -165,7 +165,7 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help Name: metricName, Help: help, Objectives: objectives, - }, labelsNames(labels)) + }, labelNames(labels)) if err := prometheus.Register(summaryVec); err != nil { return nil, err } @@ -204,7 +204,7 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he Name: metricName, Help: help, Buckets: buckets, - }, labelsNames(labels)) + }, labelNames(labels)) if err := prometheus.Register(histogramVec); err != nil { return nil, err } From d1b2dd47a8a5b90f2241d47d28765b87677e4cea Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Thu, 13 Dec 2018 16:53:40 +0300 Subject: [PATCH 5/8] TTL expiration tests, README update Signed-off-by: Ivan Mikheykin --- README.md | 13 +++ exporter.go | 9 +- exporter_test.go | 225 ++++++++++++++++++++++++++++++++++++-- go.mod | 1 + pkg/mapper/mapper.go | 5 +- pkg/mapper/mapper_test.go | 65 +++++++++++ 6 files changed, 302 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 529fbb1..11177c7 100644 --- a/README.md +++ b/README.md @@ -268,6 +268,7 @@ defaults: buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ] match_type: glob glob_disable_ordering: false + ttl: 0 # metrics not expired mappings: # This will be a histogram using the buckets set in `defaults`. - match: test.timing.*.*.* @@ -350,6 +351,18 @@ mappings: Possible values for `match_metric_type` are `gauge`, `counter` and `timer`. +### Time series expiration + +`ttl` parameter can be used to define expiration time for stale metrics. +Value is a time duration with valid time units: "ns", "us" (or "µs"), +"ms", "s", "m", "h". For example, `ttl: 1m20s`. `0` value is used to indicate +not expired metrics. + +Expiration is useful to gather metrics with changing label values from +dynamic environments like Kubernetes. For example, metric +`ingress_nginx_upstream_retries_count` with label `pod` and +changing pod name as a value for this label. + ## Using Docker You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image. diff --git a/exporter.go b/exporter.go index b813590..6e10213 100644 --- a/exporter.go +++ b/exporter.go @@ -265,7 +265,7 @@ type Events []Event type LabelValues struct { lastRegisteredAt time.Time labels prometheus.Labels - ttl uint64 + ttl time.Duration } type Exporter struct { @@ -315,6 +315,9 @@ func (b *Exporter) handleEvent(event Event) { mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType()) if mapping == nil { mapping = &mapper.MetricMapping{} + if b.mapper.Defaults.Ttl != 0 { + mapping.Ttl = b.mapper.Defaults.Ttl + } } if mapping.Action == mapper.ActionTypeDrop { @@ -443,7 +446,7 @@ func (b *Exporter) removeStaleMetrics() { if lvs.ttl == 0 { continue } - if lvs.lastRegisteredAt.Add(time.Duration(lvs.ttl) * time.Second).Before(now) { + if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) { b.Counters.Delete(metricName, lvs.labels) b.Gauges.Delete(metricName, lvs.labels) b.Summaries.Delete(metricName, lvs.labels) @@ -455,7 +458,7 @@ func (b *Exporter) removeStaleMetrics() { } // saveLabelValues stores label values set to labelValues and update lastRegisteredAt time -func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl uint64) { +func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) { _, hasMetric := b.labelValues[metricName] if !hasMetric { b.labelValues[metricName] = make(map[uint64]*LabelValues) diff --git a/exporter_test.go b/exporter_test.go index 64f3206..eafe8de 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -100,20 +100,15 @@ func TestHistogramUnits(t *testing.T) { ex.Listen(events) - histogram, err := ex.Histograms.Get(name, prometheus.Labels{}, "", nil) + metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { - t.Fatalf("Histogram not registered") + t.Fatalf("Cannot gather from DefaultGatherer: %v", err) } - - // check the state of the histogram by - // (ab)using its Write method (which is usually only used by Prometheus internally). - metric := &dto.Metric{} - histogram.Write(metric) - value := *metric.Histogram.SampleSum - if value == 300 { + value := getFloat64(metrics, name, prometheus.Labels{}) + if *value == 300 { t.Fatalf("Histogram observations not scaled into Seconds") - } else if value != .300 { - t.Fatalf("Received unexpected value for histogram observation %f != .300", value) + } else if *value != .300 { + t.Fatalf("Received unexpected value for histogram observation %f != .300", *value) } } @@ -173,3 +168,211 @@ func TestEscapeMetricName(t *testing.T) { } } } + +// TestTtlExpiration validates expiration of time series. +// foobar metric without mapping should expire with default ttl of 1s +// bazqux metric should expire with ttl of 2s +func TestTtlExpiration(t *testing.T) { + config := ` +defaults: + ttl: 1s +mappings: +- match: bazqux.* + name: bazqux + labels: + first: baz + second: qux + third: $1 + ttl: 2s +` + + bazquxLabels := prometheus.Labels{ + "third": "main", + "first": "baz", + "second": "qux", + } + + testMapper := &mapper.MetricMapper{} + err := testMapper.InitFromYAMLString(config) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + + ex := NewExporter(testMapper) + for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { + events := make(chan Events, 2) + fatal := make(chan error, 1) // t.Fatal must not be called in goroutines (SA2002) + stop := make(chan bool, 1) + + l.handlePacket([]byte("foobar:200|g"), events) + l.handlePacket([]byte("bazqux.main:42|ms"), events) + + // Close channel to signify we are done with the listener after a short period. + go func() { + defer close(events) + + time.Sleep(time.Millisecond * 100) + + var metrics []*dto.MetricFamily + var foobarValue *float64 + var bazquxValue *float64 + + // Wait to gather both metrics + var tries = 7 + for { + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if foobarValue != nil && bazquxValue != nil { + break + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Gauge `foobar` and Summary `bazqux` should be gathered") + return + } + time.Sleep(time.Millisecond * 100) + } + + // Check values + if *foobarValue != 200 { + fatal <- fmt.Errorf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) + return + } + if *bazquxValue != 42 { + fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + return + } + + // Wait for expiration of foobar + tries = 20 // 20*100 = 2s + for { + time.Sleep(time.Millisecond * 100) + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if foobarValue == nil { + break + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Gauge `foobar` should be expired") + return + } + } + + if *bazquxValue != 42 { + fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + return + } + + // Wait for expiration of bazqux + tries = 20 // 20*100 = 2s + for { + time.Sleep(time.Millisecond * 100) + metrics, err = prometheus.DefaultGatherer.Gather() + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) + if bazquxValue == nil { + break + } + if foobarValue != nil { + fatal <- fmt.Errorf("Gauge `foobar` should not be gathered after expiration") + return + } + + tries-- + if tries == 0 { + fatal <- fmt.Errorf("Summary `bazqux` should be expired") + return + } + } + }() + + go func() { + ex.Listen(events) + stop <- true + }() + + for { + select { + case err := <-fatal: + t.Fatalf("%v", err) + case <-stop: + return + } + } + + } +} + +// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels. +// Method returns a value or nil if metric is not found. +func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { + var metricFamily *dto.MetricFamily + for _, m := range metrics { + if *m.Name == name { + metricFamily = m + break + } + } + if metricFamily == nil { + return nil + } + + var metric *dto.Metric + labelsHash := hashNameAndLabels(name, labels) + for _, m := range metricFamily.Metric { + h := hashNameAndLabels(name, labelPairsAsLabels(m.GetLabel())) + if h == labelsHash { + metric = m + break + } + } + if metric == nil { + return nil + } + + var value float64 + if metric.Gauge != nil { + value = metric.Gauge.GetValue() + return &value + } + if metric.Counter != nil { + value = metric.Counter.GetValue() + return &value + } + if metric.Histogram != nil { + value = metric.Histogram.GetSampleSum() + return &value + } + if metric.Summary != nil { + value = metric.Summary.GetSampleSum() + return &value + } + if metric.Untyped != nil { + value = metric.Untyped.GetValue() + return &value + } + panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric)) +} + +func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) { + labels = prometheus.Labels{} + for _, pair := range pairs { + if pair.Name == nil { + continue + } + value := "" + if pair.Value != nil { + value = *pair.Value + } + labels[*pair.Name] = value + } + return +} diff --git a/go.mod b/go.mod index b69800f..488a957 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/onsi/gomega v1.4.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v0.9.2 + github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/sergi/go-diff v1.0.0 // indirect github.com/sirupsen/logrus v1.0.3 // indirect diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 621f5a2..b9fe3d7 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper/fsm" yaml "gopkg.in/yaml.v2" + "time" ) var ( @@ -39,7 +40,7 @@ type mapperConfigDefaults struct { Quantiles []metricObjective `yaml:"quantiles"` MatchType MatchType `yaml:"match_type"` GlobDisableOrdering bool `yaml:"glob_disable_ordering"` - Ttl uint64 `yaml:"ttl"` + Ttl time.Duration `yaml:"ttl"` } type MetricMapper struct { @@ -70,7 +71,7 @@ type MetricMapping struct { HelpText string `yaml:"help"` Action ActionType `yaml:"action"` MatchMetricType MetricType `yaml:"match_metric_type"` - Ttl uint64 `yaml:"ttl"` + Ttl time.Duration `yaml:"ttl"` } type metricObjective struct { diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 08e4306..8d1ea49 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -15,6 +15,7 @@ package mapper import ( "testing" + "time" ) type mappings map[string]struct { @@ -22,6 +23,7 @@ type mappings map[string]struct { labels map[string]string quantiles []metricObjective notPresent bool + ttl time.Duration } func TestMetricMapperYAML(t *testing.T) { @@ -603,6 +605,66 @@ mappings: }, }, }, + // Config that has a ttl. + { + config: `mappings: +- match: web.* + name: "web" + ttl: 10s + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Second * 10, + }, + }, + }, + // Config that has a default ttl. + { + config: `defaults: + ttl: 1m2s +mappings: +- match: web.* + name: "web" + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Minute + time.Second*2, + }, + }, + }, + // Config that override a default ttl. + { + config: `defaults: + ttl: 1m2s +mappings: +- match: web.* + name: "web" + ttl: 5s + labels: + site: "$1"`, + mappings: mappings{ + "test.a": {}, + "web.localhost": { + name: "web", + labels: map[string]string{ + "site": "localhost", + }, + ttl: time.Second * 5, + }, + }, + }, } mapper := MetricMapper{} @@ -633,6 +695,9 @@ mappings: t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels) } } + if mapping.ttl > 0 && mapping.ttl != m.Ttl { + t.Fatalf("%d.%q: Expected ttl of %s, got %s", i, metric, mapping.ttl.String(), m.Ttl.String()) + } if len(mapping.quantiles) != 0 { if len(mapping.quantiles) != len(m.Quantiles) { From e550f061f61a9e6aee692bd143197a26d1aea3ea Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Thu, 13 Dec 2018 19:36:19 +0300 Subject: [PATCH 6/8] Use updated ttl value from mapping in saveLabelValues Signed-off-by: Ivan Mikheykin --- exporter.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/exporter.go b/exporter.go index 6e10213..0f66f79 100644 --- a/exporter.go +++ b/exporter.go @@ -457,7 +457,7 @@ func (b *Exporter) removeStaleMetrics() { } } -// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time +// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) { _, hasMetric := b.labelValues[metricName] if !hasMetric { @@ -473,6 +473,8 @@ func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, } now := time.Now() b.labelValues[metricName][hash].lastRegisteredAt = now + // Update ttl from mapping + b.labelValues[metricName][hash].ttl = ttl } func NewExporter(mapper *mapper.MetricMapper) *Exporter { From 331d2a56d073a8d04ccf8dd81edefccf52712a01 Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Tue, 18 Dec 2018 15:16:05 +0300 Subject: [PATCH 7/8] Language fixes in README, reduce map lookups in saveLabelValues. Signed-off-by: Ivan Mikheykin --- README.md | 16 ++++++++-------- exporter.go | 18 ++++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 11177c7..16f2646 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ defaults: buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5 ] match_type: glob glob_disable_ordering: false - ttl: 0 # metrics not expired + ttl: 0 # metrics do not expire mappings: # This will be a histogram using the buckets set in `defaults`. - match: test.timing.*.*.* @@ -353,15 +353,15 @@ Possible values for `match_metric_type` are `gauge`, `counter` and `timer`. ### Time series expiration -`ttl` parameter can be used to define expiration time for stale metrics. -Value is a time duration with valid time units: "ns", "us" (or "µs"), +The `ttl` parameter can be used to define the expiration time for stale metrics. +The value is a time duration with valid time units: "ns", "us" (or "µs"), "ms", "s", "m", "h". For example, `ttl: 1m20s`. `0` value is used to indicate -not expired metrics. +metrics that do not expire. -Expiration is useful to gather metrics with changing label values from -dynamic environments like Kubernetes. For example, metric -`ingress_nginx_upstream_retries_count` with label `pod` and -changing pod name as a value for this label. + TTLs are applied to each mapped metric name/labels combination whenever + new samples are received. This means that you cannot immediately expire a + metric only by changing the mapping configuration. At least one sample must + be received for updated mappings to take effect. ## Using Docker diff --git a/exporter.go b/exporter.go index 0f66f79..943808d 100644 --- a/exporter.go +++ b/exporter.go @@ -149,7 +149,7 @@ func NewSummaryContainer(mapper *mapper.MetricMapper) *SummaryContainer { } } -func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Summary, error) { +func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { summaryVec, ok := c.Elements[metricName] if !ok { quantiles := c.mapper.Defaults.Quantiles @@ -192,7 +192,7 @@ func NewHistogramContainer(mapper *mapper.MetricMapper) *HistogramContainer { } } -func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Histogram, error) { +func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { histogramVec, ok := c.Elements[metricName] if !ok { buckets := c.mapper.Defaults.Buckets @@ -459,22 +459,24 @@ func (b *Exporter) removeStaleMetrics() { // saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) { - _, hasMetric := b.labelValues[metricName] + metric, hasMetric := b.labelValues[metricName] if !hasMetric { - b.labelValues[metricName] = make(map[uint64]*LabelValues) + metric = make(map[uint64]*LabelValues) + b.labelValues[metricName] = metric } hash := hashNameAndLabels(metricName, labels) - _, ok := b.labelValues[metricName][hash] + metricLabelValues, ok := metric[hash] if !ok { - b.labelValues[metricName][hash] = &LabelValues{ + metricLabelValues = &LabelValues{ labels: labels, ttl: ttl, } + b.labelValues[metricName][hash] = metricLabelValues } now := time.Now() - b.labelValues[metricName][hash].lastRegisteredAt = now + metricLabelValues.lastRegisteredAt = now // Update ttl from mapping - b.labelValues[metricName][hash].ttl = ttl + metricLabelValues.ttl = ttl } func NewExporter(mapper *mapper.MetricMapper) *Exporter { From 699c11ca11ff408040e26973e81939582971ce9d Mon Sep 17 00:00:00 2001 From: Ivan Mikheykin Date: Wed, 19 Dec 2018 08:21:43 +0300 Subject: [PATCH 8/8] Rework tests to not depend on actual wall clocks Signed-off-by: Ivan Mikheykin --- exporter.go | 7 +- exporter_test.go | 279 +++++++++++++++++++++------------------------ pkg/clock/clock.go | 28 +++++ 3 files changed, 161 insertions(+), 153 deletions(-) create mode 100644 pkg/clock/clock.go diff --git a/exporter.go b/exporter.go index 943808d..13c98f1 100644 --- a/exporter.go +++ b/exporter.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -291,7 +292,7 @@ func escapeMetricName(metricName string) string { // Listen handles all events sent to the given channel sequentially. It // terminates when the channel is closed. func (b *Exporter) Listen(e <-chan Events) { - removeStaleMetricsTicker := time.NewTicker(time.Second) + removeStaleMetricsTicker := clock.NewTicker(time.Second) for { select { @@ -439,7 +440,7 @@ func (b *Exporter) handleEvent(event Event) { // removeStaleMetrics removes label values set from metric with stale values func (b *Exporter) removeStaleMetrics() { - now := time.Now() + now := clock.Now() // delete timeseries with expired ttl for metricName := range b.labelValues { for hash, lvs := range b.labelValues[metricName] { @@ -473,7 +474,7 @@ func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, } b.labelValues[metricName][hash] = metricLabelValues } - now := time.Now() + now := clock.Now() metricLabelValues.lastRegisteredAt = now // Update ttl from mapping metricLabelValues.ttl = ttl diff --git a/exporter_test.go b/exporter_test.go index eafe8de..b4615b8 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -39,22 +40,19 @@ func TestNegativeCounter(t *testing.T) { } }() - events := make(chan Events, 1) - c := Events{ - &CounterEvent{ - metricName: "foo", - value: -1, - }, - } - events <- c - ex := NewExporter(&mapper.MetricMapper{}) - - // Close channel to signify we are done with the listener after a short period. + events := make(chan Events, 0) go func() { - time.Sleep(time.Millisecond * 100) + c := Events{ + &CounterEvent{ + metricName: "foo", + value: -1, + }, + } + events <- c close(events) }() + ex := NewExporter(&mapper.MetricMapper{}) ex.Listen(events) } @@ -63,24 +61,37 @@ func TestNegativeCounter(t *testing.T) { // It sends the same tags first with a valid value, then with an invalid one. // The exporter should not panic, but drop the invalid event func TestInvalidUtf8InDatadogTagValue(t *testing.T) { + defer func() { + if e := recover(); e != nil { + err := e.(error) + t.Fatalf("Exporter listener should not panic on bad utf8: %q", err.Error()) + } + }() + + events := make(chan Events, 0) + + go func() { + for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { + l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events) + } + close(events) + }() + ex := NewExporter(&mapper.MetricMapper{}) - for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { - events := make(chan Events, 2) - - l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events) - - // Close channel to signify we are done with the listener after a short period. - go func() { - time.Sleep(time.Millisecond * 100) - close(events) - }() - - ex.Listen(events) - } + ex.Listen(events) } func TestHistogramUnits(t *testing.T) { - events := make(chan Events, 1) + // Start exporter with a synchronous channel + events := make(chan Events, 0) + go func() { + ex := NewExporter(&mapper.MetricMapper{}) + ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram + ex.Listen(events) + }() + + // Synchronously send a statsd event to wait for handleEvent execution. + // Then close events channel to stop a listener. name := "foo" c := Events{ &TimerEvent{ @@ -89,22 +100,18 @@ func TestHistogramUnits(t *testing.T) { }, } events <- c - ex := NewExporter(&mapper.MetricMapper{}) - ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram - - // Close channel to signify we are done with the listener after a short period. - go func() { - time.Sleep(time.Millisecond * 100) - close(events) - }() - - ex.Listen(events) + events <- Events{} + close(events) + // Check histogram value metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { t.Fatalf("Cannot gather from DefaultGatherer: %v", err) } value := getFloat64(metrics, name, prometheus.Labels{}) + if value == nil { + t.Fatal("Histogram value should not be nil") + } if *value == 300 { t.Fatalf("Histogram observations not scaled into Seconds") } else if *value != .300 { @@ -173,141 +180,113 @@ func TestEscapeMetricName(t *testing.T) { // foobar metric without mapping should expire with default ttl of 1s // bazqux metric should expire with ttl of 2s func TestTtlExpiration(t *testing.T) { + // Mock a time.NewTicker + tickerCh := make(chan time.Time, 0) + clock.ClockInstance = &clock.Clock{ + TickerCh: tickerCh, + } + config := ` defaults: ttl: 1s mappings: - match: bazqux.* name: bazqux - labels: - first: baz - second: qux - third: $1 ttl: 2s ` - - bazquxLabels := prometheus.Labels{ - "third": "main", - "first": "baz", - "second": "qux", - } - + // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} err := testMapper.InitFromYAMLString(config) if err != nil { t.Fatalf("Config load error: %s %s", config, err) } + events := make(chan Events, 0) + defer close(events) + go func() { + ex := NewExporter(testMapper) + ex.Listen(events) + }() - ex := NewExporter(testMapper) - for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { - events := make(chan Events, 2) - fatal := make(chan error, 1) // t.Fatal must not be called in goroutines (SA2002) - stop := make(chan bool, 1) + ev := Events{ + // event with default ttl = 1s + &GaugeEvent{ + metricName: "foobar", + value: 200, + }, + // event with ttl = 2s from a mapping + &TimerEvent{ + metricName: "bazqux.main", + value: 42, + }, + } - l.handlePacket([]byte("foobar:200|g"), events) - l.handlePacket([]byte("bazqux.main:42|ms"), events) + var metrics []*dto.MetricFamily + var foobarValue *float64 + var bazquxValue *float64 - // Close channel to signify we are done with the listener after a short period. - go func() { - defer close(events) + // Step 1. Send events with statsd metrics. + // Send empty Events to wait for events are handled. + // saveLabelValues will use fake instant as a lastRegisteredAt time. + clock.ClockInstance.Instant = time.Unix(0, 0) + events <- ev + events <- Events{} - time.Sleep(time.Millisecond * 100) + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue == nil || bazquxValue == nil { + t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered") + } + if *foobarValue != 200 { + t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } - var metrics []*dto.MetricFamily - var foobarValue *float64 - var bazquxValue *float64 + // Step 2. Increase Instant to emulate metrics expiration after 1s + clock.ClockInstance.Instant = time.Unix(1, 10) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- Events{} - // Wait to gather both metrics - var tries = 7 - for { - metrics, err = prometheus.DefaultGatherer.Gather() + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should be expired") + } + if bazquxValue == nil { + t.Fatalf("Summary `bazqux` should be gathered") + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } - foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) - bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) - if foobarValue != nil && bazquxValue != nil { - break - } - - tries-- - if tries == 0 { - fatal <- fmt.Errorf("Gauge `foobar` and Summary `bazqux` should be gathered") - return - } - time.Sleep(time.Millisecond * 100) - } - - // Check values - if *foobarValue != 200 { - fatal <- fmt.Errorf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) - return - } - if *bazquxValue != 42 { - fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) - return - } - - // Wait for expiration of foobar - tries = 20 // 20*100 = 2s - for { - time.Sleep(time.Millisecond * 100) - metrics, err = prometheus.DefaultGatherer.Gather() - - foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) - bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) - if foobarValue == nil { - break - } - - tries-- - if tries == 0 { - fatal <- fmt.Errorf("Gauge `foobar` should be expired") - return - } - } - - if *bazquxValue != 42 { - fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) - return - } - - // Wait for expiration of bazqux - tries = 20 // 20*100 = 2s - for { - time.Sleep(time.Millisecond * 100) - metrics, err = prometheus.DefaultGatherer.Gather() - - foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) - bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels) - if bazquxValue == nil { - break - } - if foobarValue != nil { - fatal <- fmt.Errorf("Gauge `foobar` should not be gathered after expiration") - return - } - - tries-- - if tries == 0 { - fatal <- fmt.Errorf("Summary `bazqux` should be expired") - return - } - } - }() - - go func() { - ex.Listen(events) - stop <- true - }() - - for { - select { - case err := <-fatal: - t.Fatalf("%v", err) - case <-stop: - return - } - } + // Step 3. Increase Instant to emulate metrics expiration after 2s + clock.ClockInstance.Instant = time.Unix(2, 200) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- Events{} + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if bazquxValue != nil { + t.Fatalf("Summary `bazqux` should be expired") + } + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should not be gathered after expiration") } } diff --git a/pkg/clock/clock.go b/pkg/clock/clock.go new file mode 100644 index 0000000..2dc8394 --- /dev/null +++ b/pkg/clock/clock.go @@ -0,0 +1,28 @@ +package clock + +import ( + "time" +) + +var ClockInstance *Clock + +type Clock struct { + Instant time.Time + TickerCh chan time.Time +} + +func Now() time.Time { + if ClockInstance == nil { + return time.Now() + } + return ClockInstance.Instant +} + +func NewTicker(d time.Duration) *time.Ticker { + if ClockInstance == nil || ClockInstance.TickerCh == nil { + return time.NewTicker(d) + } + return &time.Ticker{ + C: ClockInstance.TickerCh, + } +}