Add scale field to mapping config

This field allows configuring unit conversions in mappings. For example:

mappings:
- match: foo.latency_ms
  name: foo_latency_seconds
  scale: 0.001
- match: bar.processed_kb
  name: bar_processed_bytes
  scale: 1024

Signed-off-by: Eddie Bracho <eddiebracho@gmail.com>
This commit is contained in:
Eddie Bracho 2023-05-28 16:31:06 -07:00
parent c3752cf30f
commit fdc8b5f852
4 changed files with 145 additions and 7 deletions

View file

@ -113,19 +113,24 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
metricName = mapper.EscapeMetricName(thisEvent.MetricName()) metricName = mapper.EscapeMetricName(thisEvent.MetricName())
} }
eventValue := thisEvent.Value()
if mapping.Scale.Set {
eventValue *= mapping.Scale.Val
}
switch ev := thisEvent.(type) { switch ev := thisEvent.(type) {
case *event.CounterEvent: case *event.CounterEvent:
// We don't accept negative values for counters. Incrementing the counter with a negative number // We don't accept negative values for counters. Incrementing the counter with a negative number
// will cause the exporter to panic. Instead we will warn and continue to the next event. // will cause the exporter to panic. Instead we will warn and continue to the next event.
if thisEvent.Value() < 0.0 { if eventValue < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", eventValue)
b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc() b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return return
} }
counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount) counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
counter.Add(thisEvent.Value()) counter.Add(eventValue)
b.EventStats.WithLabelValues("counter").Inc() b.EventStats.WithLabelValues("counter").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
@ -137,9 +142,9 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
if err == nil { if err == nil {
if ev.GRelative { if ev.GRelative {
gauge.Add(thisEvent.Value()) gauge.Add(eventValue)
} else { } else {
gauge.Set(thisEvent.Value()) gauge.Set(eventValue)
} }
b.EventStats.WithLabelValues("gauge").Inc() b.EventStats.WithLabelValues("gauge").Inc()
} else { } else {
@ -160,7 +165,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
case mapper.ObserverTypeHistogram: case mapper.ObserverTypeHistogram:
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount) histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
histogram.Observe(thisEvent.Value()) histogram.Observe(eventValue)
b.EventStats.WithLabelValues("observer").Inc() b.EventStats.WithLabelValues("observer").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
@ -170,7 +175,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary: case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary:
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount) summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
summary.Observe(thisEvent.Value()) summary.Observe(eventValue)
b.EventStats.WithLabelValues("observer").Inc() b.EventStats.WithLabelValues("observer").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)

View file

@ -845,6 +845,63 @@ func TestCounterIncrement(t *testing.T) {
} }
} }
func TestScaledMapping(t *testing.T) {
events := make(chan event.Events)
testMapper := mapper.MetricMapper{}
config := `mappings:
- match: foo.processed_kilobytes
name: processed_bytes
scale: 1024
labels:
service: foo`
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
// Start exporter with a synchronous channel
go func() {
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
// Synchronously send a statsd event to wait for handleEvent execution.
// Then close events channel to stop a listener.
statsdName := "foo.processed_kilobytes"
statsdLabels := map[string]string{}
promName := "processed_bytes"
promLabels := map[string]string{"service": "foo"}
c := event.Events{
&event.CounterEvent{
CMetricName: statsdName,
CValue: 100,
CLabels: statsdLabels,
},
&event.CounterEvent{
CMetricName: statsdName,
CValue: 200,
CLabels: statsdLabels,
},
}
events <- c
// Push empty event so that we block until the first event is consumed.
events <- event.Events{}
close(events)
// Check counter value
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
}
value := getFloat64(metrics, promName, promLabels)
if value == nil {
t.Fatal("Counter value should not be nil")
}
if *value != 300*1024 {
t.Fatalf("Counter wasn't incremented properly")
}
}
type statsDPacketHandler interface { type statsDPacketHandler interface {
HandlePacket(packet []byte) HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler) SetEventHandler(eh event.EventHandler)

View file

@ -35,6 +35,7 @@ type mappings []struct {
ageBuckets uint32 ageBuckets uint32
bufCap uint32 bufCap uint32
buckets []float64 buckets []float64
scale MaybeFloat64
} }
func newTestMapperWithCache(cacheType string, size int) *MetricMapper { func newTestMapperWithCache(cacheType string, size int) *MetricMapper {
@ -1480,6 +1481,54 @@ mappings:
}, },
}, },
}, },
{
testName: "Config with 'scale' field",
config: `mappings:
- match: grpc_server.*.*.latency_ms
name: grpc_server_handling_seconds
scale: 0.001
labels:
grpc_service: "$1"
grpc_method: "$2"`,
mappings: mappings{
{
statsdMetric: "test.a",
},
{
statsdMetric: "grpc_server.Foo.Bar.latency_ms",
name: "grpc_server_handling_seconds",
scale: MaybeFloat64{Val: 0.001, Set: true},
labels: map[string]string{
"grpc_service": "Foo",
"grpc_method": "Bar",
},
},
},
},
{
testName: "Config with 'scale' using scientific notation",
config: `mappings:
- match: grpc_server.*.*.latency_us
name: grpc_server_handling_seconds
scale: 1e-6
labels:
grpc_service: "$1"
grpc_method: "$2"`,
mappings: mappings{
{
statsdMetric: "test.a",
},
{
statsdMetric: "grpc_server.Foo.Bar.latency_us",
name: "grpc_server_handling_seconds",
scale: MaybeFloat64{Val: 1e-6, Set: true},
labels: map[string]string{
"grpc_service": "Foo",
"grpc_method": "Bar",
},
},
},
},
} }
mapper := MetricMapper{} mapper := MetricMapper{}
@ -1561,6 +1610,9 @@ mappings:
if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap { if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap {
t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap) t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap)
} }
if present && mapping.scale != m.Scale {
t.Fatalf("%d.%q: Expected scale %v, got %v", i, metric, mapping.scale, m.Scale)
}
} }
}) })
} }

View file

@ -41,6 +41,7 @@ type MetricMapping struct {
Ttl time.Duration `yaml:"ttl"` Ttl time.Duration `yaml:"ttl"`
SummaryOptions *SummaryOptions `yaml:"summary_options"` SummaryOptions *SummaryOptions `yaml:"summary_options"`
HistogramOptions *HistogramOptions `yaml:"histogram_options"` HistogramOptions *HistogramOptions `yaml:"histogram_options"`
Scale MaybeFloat64 `yaml:"scale"`
} }
// UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys // UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys
@ -66,6 +67,7 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {
m.Ttl = tmp.Ttl m.Ttl = tmp.Ttl
m.SummaryOptions = tmp.SummaryOptions m.SummaryOptions = tmp.SummaryOptions
m.HistogramOptions = tmp.HistogramOptions m.HistogramOptions = tmp.HistogramOptions
m.Scale = tmp.Scale
// Use deprecated TimerType if necessary // Use deprecated TimerType if necessary
if tmp.ObserverType == "" { if tmp.ObserverType == "" {
@ -74,3 +76,25 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil return nil
} }
type MaybeFloat64 struct {
Set bool
Val float64
}
func (m *MaybeFloat64) MarshalYAML() (interface{}, error) {
if m.Set {
return m.Val, nil
}
return nil, nil
}
func (m *MaybeFloat64) UnmarshalYAML(unmarshal func(interface{}) error) error {
var tmp float64
if err := unmarshal(&tmp); err != nil {
return err
}
m.Val = tmp
m.Set = true
return nil
}