diff --git a/pkg/event/event.go b/pkg/event/event.go index d5e65ce..66fc40f 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -18,6 +18,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -39,6 +40,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName } func (c *CounterEvent) Value() float64 { return c.CValue } func (c *CounterEvent) Labels() map[string]string { return c.CLabels } func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } +func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} } type GaugeEvent struct { GMetricName string @@ -51,6 +53,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName } func (g *GaugeEvent) Value() float64 { return g.GValue } func (g *GaugeEvent) Labels() map[string]string { return g.GLabels } func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } +func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} } type ObserverEvent struct { OMetricName string @@ -62,6 +65,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName } func (o *ObserverEvent) Value() float64 { return o.OValue } func (o *ObserverEvent) Labels() map[string]string { return o.OLabels } func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} } type Events []Event @@ -136,3 +140,87 @@ type UnbufferedEventHandler struct { func (ueh *UnbufferedEventHandler) Queue(events Events) { ueh.C <- events } + +// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface. +type MultiValueEvent interface { + MetricName() string + Labels() map[string]string + MetricType() mapper.MetricType + Values() []float64 +} + +type MultiObserverEvent struct { + OMetricName string + OValues []float64 // DataDog extensions allow multiple values in a single sample + OLabels map[string]string + SampleRate float64 +} + +type ExpandableEvent interface { + Expand() []Event +} + +func (m *MultiObserverEvent) MetricName() string { return m.OMetricName } +func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] } +func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels } +func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (m *MultiObserverEvent) Values() []float64 { return m.OValues } + +// Expand returns a list of events that are the result of expanding the multi-value event. +// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events. +// And keep the exporter code compatible with previous versions. +func (m *MultiObserverEvent) Expand() []Event { + if len(m.OValues) == 1 && m.SampleRate == 0 { + return []Event{ + &ObserverEvent{ + OMetricName: m.OMetricName, + OValue: m.OValues[0], + OLabels: copyLabels(m.OLabels), + }, + } + } + + events := make([]Event, 0, len(m.OValues)) + for _, value := range m.OValues { + events = append(events, &ObserverEvent{ + OMetricName: m.OMetricName, + OValue: value, + OLabels: copyLabels(m.OLabels), + }) + } + + if m.SampleRate > 0 && m.SampleRate < 1 { + multiplier := int(1 / m.SampleRate) + multipliedEvents := make([]Event, 0, len(events)*multiplier) + for i := 0; i < multiplier; i++ { + for _, event := range events { + e := event.(*ObserverEvent) + multipliedEvents = append(multipliedEvents, &ObserverEvent{ + OMetricName: e.OMetricName, + OValue: e.OValue, + OLabels: copyLabels(e.OLabels), + }) + } + } + return multipliedEvents + } + + return events +} + +// Helper function to copy labels map +func copyLabels(labels map[string]string) map[string]string { + newLabels := make(map[string]string, len(labels)) + for k, v := range labels { + newLabels[k] = v + } + return newLabels +} + +var ( + _ ExpandableEvent = &MultiObserverEvent{} + _ MultiValueEvent = &MultiObserverEvent{} + _ MultiValueEvent = &CounterEvent{} + _ MultiValueEvent = &GaugeEvent{} + _ MultiValueEvent = &ObserverEvent{} +) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 192ce29..71cc8ad 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -14,11 +14,15 @@ package event import ( + "fmt" + "reflect" "testing" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" ) var eventsFlushed = prometheus.NewCounter( @@ -85,3 +89,206 @@ func TestEventIntervalFlush(t *testing.T) { t.Fatal("Expected 10 events in the event channel, but got", len(events)) } } + +func TestMultiValueEvent(t *testing.T) { + tests := []struct { + name string + event MultiValueEvent + wantValues []float64 + wantName string + wantType mapper.MetricType + wantLabels map[string]string + }{ + { + name: "MultiObserverEvent with single value", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantValues: []float64{1.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "MultiObserverEvent with multiple values", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantValues: []float64{1.0, 2.0, 3.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "CounterEvent implements MultiValueEvent", + event: &CounterEvent{ + CMetricName: "test_counter", + CValue: 42.0, + CLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{42.0}, + wantName: "test_counter", + wantType: mapper.MetricTypeCounter, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "GaugeEvent implements MultiValueEvent", + event: &GaugeEvent{ + GMetricName: "test_gauge", + GValue: 123.0, + GLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{123.0}, + wantName: "test_gauge", + wantType: mapper.MetricTypeGauge, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "ObserverEvent implements MultiValueEvent", + event: &ObserverEvent{ + OMetricName: "test_observer", + OValue: 99.0, + OLabels: map[string]string{"label": "value"}, + }, + wantValues: []float64{99.0}, + wantName: "test_observer", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) { + t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues) + } + if got := tt.event.MetricName(); got != tt.wantName { + t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName) + } + if got := tt.event.MetricType(); got != tt.wantType { + t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType) + } + if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) { + t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels) + } + }) + } +} + +func TestMultiObserverEvent_Expand(t *testing.T) { + t.Parallel() + tests := []struct { + name string + event *MultiObserverEvent + wantEvents []Event + }{ + { + name: "single value no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + { + name: "multiple values no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 3.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + { + name: "multiple values with sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got := tt.event.Expand() + if len(tt.wantEvents) != len(got) { + t.Fatalf("Expected %d events, but got %d", len(tt.wantEvents), len(got)) + } + + eventCount := func(events []Event) map[string]int { + counts := make(map[string]int) + for _, event := range events { + oe := event.(*ObserverEvent) + key := fmt.Sprintf("%s%f%v", oe.OMetricName, oe.OValue, oe.OLabels) + counts[key]++ + } + return counts + } + + wantMap := eventCount(tt.wantEvents) + gotMap := eventCount(got) + + for key, count := range wantMap { + if gotMap[key] != count { + t.Fatalf("Event mismatch for key %v: expected %d, got %d", key, count, gotMap[key]) + } + } + }) + } +}