Merge pull request #602 from prometheus/feat/multi-valued-event

feat(event): Add MultiValueEvent interface and MultiObserverEvent implementation
This commit is contained in:
Pedro Tanaka 2025-02-22 14:47:19 +01:00 committed by GitHub
commit 45044cbc48
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 295 additions and 0 deletions

View file

@ -18,6 +18,7 @@ import (
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
) )
@ -39,6 +40,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName }
func (c *CounterEvent) Value() float64 { return c.CValue } func (c *CounterEvent) Value() float64 { return c.CValue }
func (c *CounterEvent) Labels() map[string]string { return c.CLabels } func (c *CounterEvent) Labels() map[string]string { return c.CLabels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} }
type GaugeEvent struct { type GaugeEvent struct {
GMetricName string GMetricName string
@ -51,6 +53,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName }
func (g *GaugeEvent) Value() float64 { return g.GValue } func (g *GaugeEvent) Value() float64 { return g.GValue }
func (g *GaugeEvent) Labels() map[string]string { return g.GLabels } func (g *GaugeEvent) Labels() map[string]string { return g.GLabels }
func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} }
type ObserverEvent struct { type ObserverEvent struct {
OMetricName string OMetricName string
@ -62,6 +65,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName }
func (o *ObserverEvent) Value() float64 { return o.OValue } func (o *ObserverEvent) Value() float64 { return o.OValue }
func (o *ObserverEvent) Labels() map[string]string { return o.OLabels } func (o *ObserverEvent) Labels() map[string]string { return o.OLabels }
func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} }
type Events []Event type Events []Event
@ -136,3 +140,87 @@ type UnbufferedEventHandler struct {
func (ueh *UnbufferedEventHandler) Queue(events Events) { func (ueh *UnbufferedEventHandler) Queue(events Events) {
ueh.C <- events ueh.C <- events
} }
// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface.
type MultiValueEvent interface {
MetricName() string
Labels() map[string]string
MetricType() mapper.MetricType
Values() []float64
}
type MultiObserverEvent struct {
OMetricName string
OValues []float64 // DataDog extensions allow multiple values in a single sample
OLabels map[string]string
SampleRate float64
}
type ExpandableEvent interface {
Expand() []Event
}
func (m *MultiObserverEvent) MetricName() string { return m.OMetricName }
func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] }
func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels }
func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
func (m *MultiObserverEvent) Values() []float64 { return m.OValues }
// Expand returns a list of events that are the result of expanding the multi-value event.
// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events.
// And keep the exporter code compatible with previous versions.
func (m *MultiObserverEvent) Expand() []Event {
if len(m.OValues) == 1 && m.SampleRate == 0 {
return []Event{
&ObserverEvent{
OMetricName: m.OMetricName,
OValue: m.OValues[0],
OLabels: copyLabels(m.OLabels),
},
}
}
events := make([]Event, 0, len(m.OValues))
for _, value := range m.OValues {
events = append(events, &ObserverEvent{
OMetricName: m.OMetricName,
OValue: value,
OLabels: copyLabels(m.OLabels),
})
}
if m.SampleRate > 0 && m.SampleRate < 1 {
multiplier := int(1 / m.SampleRate)
multipliedEvents := make([]Event, 0, len(events)*multiplier)
for i := 0; i < multiplier; i++ {
for _, event := range events {
e := event.(*ObserverEvent)
multipliedEvents = append(multipliedEvents, &ObserverEvent{
OMetricName: e.OMetricName,
OValue: e.OValue,
OLabels: copyLabels(e.OLabels),
})
}
}
return multipliedEvents
}
return events
}
// Helper function to copy labels map
func copyLabels(labels map[string]string) map[string]string {
newLabels := make(map[string]string, len(labels))
for k, v := range labels {
newLabels[k] = v
}
return newLabels
}
var (
_ ExpandableEvent = &MultiObserverEvent{}
_ MultiValueEvent = &MultiObserverEvent{}
_ MultiValueEvent = &CounterEvent{}
_ MultiValueEvent = &GaugeEvent{}
_ MultiValueEvent = &ObserverEvent{}
)

View file

@ -14,11 +14,15 @@
package event package event
import ( import (
"fmt"
"reflect"
"testing" "testing"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
) )
var eventsFlushed = prometheus.NewCounter( var eventsFlushed = prometheus.NewCounter(
@ -85,3 +89,206 @@ func TestEventIntervalFlush(t *testing.T) {
t.Fatal("Expected 10 events in the event channel, but got", len(events)) t.Fatal("Expected 10 events in the event channel, but got", len(events))
} }
} }
func TestMultiValueEvent(t *testing.T) {
tests := []struct {
name string
event MultiValueEvent
wantValues []float64
wantName string
wantType mapper.MetricType
wantLabels map[string]string
}{
{
name: "MultiObserverEvent with single value",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantValues: []float64{1.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
{
name: "MultiObserverEvent with multiple values",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantValues: []float64{1.0, 2.0, 3.0},
wantName: "test_metric",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
{
name: "CounterEvent implements MultiValueEvent",
event: &CounterEvent{
CMetricName: "test_counter",
CValue: 42.0,
CLabels: map[string]string{"label": "value"},
},
wantValues: []float64{42.0},
wantName: "test_counter",
wantType: mapper.MetricTypeCounter,
wantLabels: map[string]string{"label": "value"},
},
{
name: "GaugeEvent implements MultiValueEvent",
event: &GaugeEvent{
GMetricName: "test_gauge",
GValue: 123.0,
GLabels: map[string]string{"label": "value"},
},
wantValues: []float64{123.0},
wantName: "test_gauge",
wantType: mapper.MetricTypeGauge,
wantLabels: map[string]string{"label": "value"},
},
{
name: "ObserverEvent implements MultiValueEvent",
event: &ObserverEvent{
OMetricName: "test_observer",
OValue: 99.0,
OLabels: map[string]string{"label": "value"},
},
wantValues: []float64{99.0},
wantName: "test_observer",
wantType: mapper.MetricTypeObserver,
wantLabels: map[string]string{"label": "value"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) {
t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues)
}
if got := tt.event.MetricName(); got != tt.wantName {
t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName)
}
if got := tt.event.MetricType(); got != tt.wantType {
t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType)
}
if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) {
t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels)
}
})
}
}
func TestMultiObserverEvent_Expand(t *testing.T) {
t.Parallel()
tests := []struct {
name string
event *MultiObserverEvent
wantEvents []Event
}{
{
name: "single value no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
},
},
{
name: "multiple values no sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0, 3.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 3.0,
OLabels: map[string]string{"label": "value"},
},
},
},
{
name: "multiple values with sampling",
event: &MultiObserverEvent{
OMetricName: "test_metric",
OValues: []float64{1.0, 2.0},
OLabels: map[string]string{"label": "value"},
SampleRate: 0.5,
},
wantEvents: []Event{
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 1.0,
OLabels: map[string]string{"label": "value"},
},
&ObserverEvent{
OMetricName: "test_metric",
OValue: 2.0,
OLabels: map[string]string{"label": "value"},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got := tt.event.Expand()
if len(tt.wantEvents) != len(got) {
t.Fatalf("Expected %d events, but got %d", len(tt.wantEvents), len(got))
}
eventCount := func(events []Event) map[string]int {
counts := make(map[string]int)
for _, event := range events {
oe := event.(*ObserverEvent)
key := fmt.Sprintf("%s%f%v", oe.OMetricName, oe.OValue, oe.OLabels)
counts[key]++
}
return counts
}
wantMap := eventCount(tt.wantEvents)
gotMap := eventCount(got)
for key, count := range wantMap {
if gotMap[key] != count {
t.Fatalf("Event mismatch for key %v: expected %d, got %d", key, count, gotMap[key])
}
}
})
}
}