mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2025-02-02 09:32:20 +00:00
Rework tests to not depend on actual wall clocks
Signed-off-by: Ivan Mikheykin <ivan.mikheykin@flant.com>
This commit is contained in:
parent
331d2a56d0
commit
699c11ca11
3 changed files with 161 additions and 153 deletions
|
@ -32,6 +32,7 @@ import (
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -291,7 +292,7 @@ func escapeMetricName(metricName string) string {
|
||||||
// Listen handles all events sent to the given channel sequentially. It
|
// Listen handles all events sent to the given channel sequentially. It
|
||||||
// terminates when the channel is closed.
|
// terminates when the channel is closed.
|
||||||
func (b *Exporter) Listen(e <-chan Events) {
|
func (b *Exporter) Listen(e <-chan Events) {
|
||||||
removeStaleMetricsTicker := time.NewTicker(time.Second)
|
removeStaleMetricsTicker := clock.NewTicker(time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -439,7 +440,7 @@ func (b *Exporter) handleEvent(event Event) {
|
||||||
|
|
||||||
// removeStaleMetrics removes label values set from metric with stale values
|
// removeStaleMetrics removes label values set from metric with stale values
|
||||||
func (b *Exporter) removeStaleMetrics() {
|
func (b *Exporter) removeStaleMetrics() {
|
||||||
now := time.Now()
|
now := clock.Now()
|
||||||
// delete timeseries with expired ttl
|
// delete timeseries with expired ttl
|
||||||
for metricName := range b.labelValues {
|
for metricName := range b.labelValues {
|
||||||
for hash, lvs := range b.labelValues[metricName] {
|
for hash, lvs := range b.labelValues[metricName] {
|
||||||
|
@ -473,7 +474,7 @@ func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels,
|
||||||
}
|
}
|
||||||
b.labelValues[metricName][hash] = metricLabelValues
|
b.labelValues[metricName][hash] = metricLabelValues
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := clock.Now()
|
||||||
metricLabelValues.lastRegisteredAt = now
|
metricLabelValues.lastRegisteredAt = now
|
||||||
// Update ttl from mapping
|
// Update ttl from mapping
|
||||||
metricLabelValues.ttl = ttl
|
metricLabelValues.ttl = ttl
|
||||||
|
|
279
exporter_test.go
279
exporter_test.go
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -39,22 +40,19 @@ func TestNegativeCounter(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
events := make(chan Events, 1)
|
events := make(chan Events, 0)
|
||||||
c := Events{
|
|
||||||
&CounterEvent{
|
|
||||||
metricName: "foo",
|
|
||||||
value: -1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
events <- c
|
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(time.Millisecond * 100)
|
c := Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: "foo",
|
||||||
|
value: -1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
events <- c
|
||||||
close(events)
|
close(events)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
ex.Listen(events)
|
ex.Listen(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,24 +61,37 @@ func TestNegativeCounter(t *testing.T) {
|
||||||
// It sends the same tags first with a valid value, then with an invalid one.
|
// It sends the same tags first with a valid value, then with an invalid one.
|
||||||
// The exporter should not panic, but drop the invalid event
|
// The exporter should not panic, but drop the invalid event
|
||||||
func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if e := recover(); e != nil {
|
||||||
|
err := e.(error)
|
||||||
|
t.Fatalf("Exporter listener should not panic on bad utf8: %q", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
||||||
|
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
|
||||||
|
}
|
||||||
|
close(events)
|
||||||
|
}()
|
||||||
|
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
ex.Listen(events)
|
||||||
events := make(chan Events, 2)
|
|
||||||
|
|
||||||
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
|
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
close(events)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ex.Listen(events)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHistogramUnits(t *testing.T) {
|
func TestHistogramUnits(t *testing.T) {
|
||||||
events := make(chan Events, 1)
|
// Start exporter with a synchronous channel
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
go func() {
|
||||||
|
ex := NewExporter(&mapper.MetricMapper{})
|
||||||
|
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
|
||||||
|
ex.Listen(events)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Synchronously send a statsd event to wait for handleEvent execution.
|
||||||
|
// Then close events channel to stop a listener.
|
||||||
name := "foo"
|
name := "foo"
|
||||||
c := Events{
|
c := Events{
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
|
@ -89,22 +100,18 @@ func TestHistogramUnits(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
events <- c
|
events <- c
|
||||||
ex := NewExporter(&mapper.MetricMapper{})
|
events <- Events{}
|
||||||
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
|
close(events)
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
close(events)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ex.Listen(events)
|
|
||||||
|
|
||||||
|
// Check histogram value
|
||||||
metrics, err := prometheus.DefaultGatherer.Gather()
|
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
||||||
}
|
}
|
||||||
value := getFloat64(metrics, name, prometheus.Labels{})
|
value := getFloat64(metrics, name, prometheus.Labels{})
|
||||||
|
if value == nil {
|
||||||
|
t.Fatal("Histogram value should not be nil")
|
||||||
|
}
|
||||||
if *value == 300 {
|
if *value == 300 {
|
||||||
t.Fatalf("Histogram observations not scaled into Seconds")
|
t.Fatalf("Histogram observations not scaled into Seconds")
|
||||||
} else if *value != .300 {
|
} else if *value != .300 {
|
||||||
|
@ -173,141 +180,113 @@ func TestEscapeMetricName(t *testing.T) {
|
||||||
// foobar metric without mapping should expire with default ttl of 1s
|
// foobar metric without mapping should expire with default ttl of 1s
|
||||||
// bazqux metric should expire with ttl of 2s
|
// bazqux metric should expire with ttl of 2s
|
||||||
func TestTtlExpiration(t *testing.T) {
|
func TestTtlExpiration(t *testing.T) {
|
||||||
|
// Mock a time.NewTicker
|
||||||
|
tickerCh := make(chan time.Time, 0)
|
||||||
|
clock.ClockInstance = &clock.Clock{
|
||||||
|
TickerCh: tickerCh,
|
||||||
|
}
|
||||||
|
|
||||||
config := `
|
config := `
|
||||||
defaults:
|
defaults:
|
||||||
ttl: 1s
|
ttl: 1s
|
||||||
mappings:
|
mappings:
|
||||||
- match: bazqux.*
|
- match: bazqux.*
|
||||||
name: bazqux
|
name: bazqux
|
||||||
labels:
|
|
||||||
first: baz
|
|
||||||
second: qux
|
|
||||||
third: $1
|
|
||||||
ttl: 2s
|
ttl: 2s
|
||||||
`
|
`
|
||||||
|
// Create mapper from config and start an Exporter with a synchronous channel
|
||||||
bazquxLabels := prometheus.Labels{
|
|
||||||
"third": "main",
|
|
||||||
"first": "baz",
|
|
||||||
"second": "qux",
|
|
||||||
}
|
|
||||||
|
|
||||||
testMapper := &mapper.MetricMapper{}
|
testMapper := &mapper.MetricMapper{}
|
||||||
err := testMapper.InitFromYAMLString(config)
|
err := testMapper.InitFromYAMLString(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Config load error: %s %s", config, err)
|
t.Fatalf("Config load error: %s %s", config, err)
|
||||||
}
|
}
|
||||||
|
events := make(chan Events, 0)
|
||||||
|
defer close(events)
|
||||||
|
go func() {
|
||||||
|
ex := NewExporter(testMapper)
|
||||||
|
ex.Listen(events)
|
||||||
|
}()
|
||||||
|
|
||||||
ex := NewExporter(testMapper)
|
ev := Events{
|
||||||
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
// event with default ttl = 1s
|
||||||
events := make(chan Events, 2)
|
&GaugeEvent{
|
||||||
fatal := make(chan error, 1) // t.Fatal must not be called in goroutines (SA2002)
|
metricName: "foobar",
|
||||||
stop := make(chan bool, 1)
|
value: 200,
|
||||||
|
},
|
||||||
|
// event with ttl = 2s from a mapping
|
||||||
|
&TimerEvent{
|
||||||
|
metricName: "bazqux.main",
|
||||||
|
value: 42,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
l.handlePacket([]byte("foobar:200|g"), events)
|
var metrics []*dto.MetricFamily
|
||||||
l.handlePacket([]byte("bazqux.main:42|ms"), events)
|
var foobarValue *float64
|
||||||
|
var bazquxValue *float64
|
||||||
|
|
||||||
// Close channel to signify we are done with the listener after a short period.
|
// Step 1. Send events with statsd metrics.
|
||||||
go func() {
|
// Send empty Events to wait for events are handled.
|
||||||
defer close(events)
|
// saveLabelValues will use fake instant as a lastRegisteredAt time.
|
||||||
|
clock.ClockInstance.Instant = time.Unix(0, 0)
|
||||||
|
events <- ev
|
||||||
|
events <- Events{}
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
// Check values
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if foobarValue == nil || bazquxValue == nil {
|
||||||
|
t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered")
|
||||||
|
}
|
||||||
|
if *foobarValue != 200 {
|
||||||
|
t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
|
||||||
|
}
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
}
|
||||||
|
|
||||||
var metrics []*dto.MetricFamily
|
// Step 2. Increase Instant to emulate metrics expiration after 1s
|
||||||
var foobarValue *float64
|
clock.ClockInstance.Instant = time.Unix(1, 10)
|
||||||
var bazquxValue *float64
|
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
|
||||||
|
events <- Events{}
|
||||||
|
|
||||||
// Wait to gather both metrics
|
// Check values
|
||||||
var tries = 7
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
for {
|
if err != nil {
|
||||||
metrics, err = prometheus.DefaultGatherer.Gather()
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if foobarValue != nil {
|
||||||
|
t.Fatalf("Gauge `foobar` should be expired")
|
||||||
|
}
|
||||||
|
if bazquxValue == nil {
|
||||||
|
t.Fatalf("Summary `bazqux` should be gathered")
|
||||||
|
}
|
||||||
|
if *bazquxValue != 42 {
|
||||||
|
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
||||||
|
}
|
||||||
|
|
||||||
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
// Step 3. Increase Instant to emulate metrics expiration after 2s
|
||||||
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
clock.ClockInstance.Instant = time.Unix(2, 200)
|
||||||
if foobarValue != nil && bazquxValue != nil {
|
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
|
||||||
break
|
events <- Events{}
|
||||||
}
|
|
||||||
|
|
||||||
tries--
|
|
||||||
if tries == 0 {
|
|
||||||
fatal <- fmt.Errorf("Gauge `foobar` and Summary `bazqux` should be gathered")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check values
|
|
||||||
if *foobarValue != 200 {
|
|
||||||
fatal <- fmt.Errorf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if *bazquxValue != 42 {
|
|
||||||
fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for expiration of foobar
|
|
||||||
tries = 20 // 20*100 = 2s
|
|
||||||
for {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
metrics, err = prometheus.DefaultGatherer.Gather()
|
|
||||||
|
|
||||||
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
|
||||||
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
|
||||||
if foobarValue == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
tries--
|
|
||||||
if tries == 0 {
|
|
||||||
fatal <- fmt.Errorf("Gauge `foobar` should be expired")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if *bazquxValue != 42 {
|
|
||||||
fatal <- fmt.Errorf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for expiration of bazqux
|
|
||||||
tries = 20 // 20*100 = 2s
|
|
||||||
for {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
metrics, err = prometheus.DefaultGatherer.Gather()
|
|
||||||
|
|
||||||
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
|
||||||
bazquxValue = getFloat64(metrics, "bazqux", bazquxLabels)
|
|
||||||
if bazquxValue == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if foobarValue != nil {
|
|
||||||
fatal <- fmt.Errorf("Gauge `foobar` should not be gathered after expiration")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tries--
|
|
||||||
if tries == 0 {
|
|
||||||
fatal <- fmt.Errorf("Summary `bazqux` should be expired")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
ex.Listen(events)
|
|
||||||
stop <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case err := <-fatal:
|
|
||||||
t.Fatalf("%v", err)
|
|
||||||
case <-stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Check values
|
||||||
|
metrics, err = prometheus.DefaultGatherer.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Gather should not fail")
|
||||||
|
}
|
||||||
|
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
|
||||||
|
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
|
||||||
|
if bazquxValue != nil {
|
||||||
|
t.Fatalf("Summary `bazqux` should be expired")
|
||||||
|
}
|
||||||
|
if foobarValue != nil {
|
||||||
|
t.Fatalf("Gauge `foobar` should not be gathered after expiration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
28
pkg/clock/clock.go
Normal file
28
pkg/clock/clock.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package clock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ClockInstance *Clock
|
||||||
|
|
||||||
|
type Clock struct {
|
||||||
|
Instant time.Time
|
||||||
|
TickerCh chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func Now() time.Time {
|
||||||
|
if ClockInstance == nil {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
return ClockInstance.Instant
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTicker(d time.Duration) *time.Ticker {
|
||||||
|
if ClockInstance == nil || ClockInstance.TickerCh == nil {
|
||||||
|
return time.NewTicker(d)
|
||||||
|
}
|
||||||
|
return &time.Ticker{
|
||||||
|
C: ClockInstance.TickerCh,
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue