diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index c61ab66..4bbed4d 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -113,19 +113,24 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { metricName = mapper.EscapeMetricName(thisEvent.MetricName()) } + eventValue := thisEvent.Value() + if mapping.Scale.Set { + eventValue *= mapping.Scale.Val + } + switch ev := thisEvent.(type) { case *event.CounterEvent: // We don't accept negative values for counters. Incrementing the counter with a negative number // will cause the exporter to panic. Instead we will warn and continue to the next event. - if thisEvent.Value() < 0.0 { - level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) + if eventValue < 0.0 { + level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", eventValue) b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc() return } counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - counter.Add(thisEvent.Value()) + counter.Add(eventValue) b.EventStats.WithLabelValues("counter").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) @@ -137,9 +142,9 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { if err == nil { if ev.GRelative { - gauge.Add(thisEvent.Value()) + gauge.Add(eventValue) } else { - gauge.Set(thisEvent.Value()) + gauge.Set(eventValue) } b.EventStats.WithLabelValues("gauge").Inc() } else { @@ -160,7 +165,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { case mapper.ObserverTypeHistogram: histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - histogram.Observe(thisEvent.Value()) + histogram.Observe(eventValue) b.EventStats.WithLabelValues("observer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) @@ -170,7 +175,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary: summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - summary.Observe(thisEvent.Value()) + summary.Observe(eventValue) b.EventStats.WithLabelValues("observer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index 6e1744c..a0bdce4 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -845,6 +845,63 @@ func TestCounterIncrement(t *testing.T) { } } +func TestScaledMapping(t *testing.T) { + events := make(chan event.Events) + testMapper := mapper.MetricMapper{} + config := `mappings: +- match: foo.processed_kilobytes + name: processed_bytes + scale: 1024 + labels: + service: foo` + err := testMapper.InitFromYAMLString(config) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + + // Start exporter with a synchronous channel + go func() { + ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) + }() + + // Synchronously send a statsd event to wait for handleEvent execution. + // Then close events channel to stop a listener. + statsdName := "foo.processed_kilobytes" + statsdLabels := map[string]string{} + promName := "processed_bytes" + promLabels := map[string]string{"service": "foo"} + c := event.Events{ + &event.CounterEvent{ + CMetricName: statsdName, + CValue: 100, + CLabels: statsdLabels, + }, + &event.CounterEvent{ + CMetricName: statsdName, + CValue: 200, + CLabels: statsdLabels, + }, + } + events <- c + // Push empty event so that we block until the first event is consumed. + events <- event.Events{} + close(events) + + // Check counter value + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatalf("Cannot gather from DefaultGatherer: %v", err) + } + value := getFloat64(metrics, promName, promLabels) + if value == nil { + t.Fatal("Counter value should not be nil") + } + if *value != 300*1024 { + t.Fatalf("Counter wasn't incremented properly") + } +} + type statsDPacketHandler interface { HandlePacket(packet []byte) SetEventHandler(eh event.EventHandler) diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index f387ea8..eab7439 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -35,6 +35,7 @@ type mappings []struct { ageBuckets uint32 bufCap uint32 buckets []float64 + scale MaybeFloat64 } func newTestMapperWithCache(cacheType string, size int) *MetricMapper { @@ -1480,6 +1481,54 @@ mappings: }, }, }, + { + testName: "Config with 'scale' field", + config: `mappings: +- match: grpc_server.*.*.latency_ms + name: grpc_server_handling_seconds + scale: 0.001 + labels: + grpc_service: "$1" + grpc_method: "$2"`, + mappings: mappings{ + { + statsdMetric: "test.a", + }, + { + statsdMetric: "grpc_server.Foo.Bar.latency_ms", + name: "grpc_server_handling_seconds", + scale: MaybeFloat64{Val: 0.001, Set: true}, + labels: map[string]string{ + "grpc_service": "Foo", + "grpc_method": "Bar", + }, + }, + }, + }, + { + testName: "Config with 'scale' using scientific notation", + config: `mappings: +- match: grpc_server.*.*.latency_us + name: grpc_server_handling_seconds + scale: 1e-6 + labels: + grpc_service: "$1" + grpc_method: "$2"`, + mappings: mappings{ + { + statsdMetric: "test.a", + }, + { + statsdMetric: "grpc_server.Foo.Bar.latency_us", + name: "grpc_server_handling_seconds", + scale: MaybeFloat64{Val: 1e-6, Set: true}, + labels: map[string]string{ + "grpc_service": "Foo", + "grpc_method": "Bar", + }, + }, + }, + }, } mapper := MetricMapper{} @@ -1561,6 +1610,9 @@ mappings: if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap { t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap) } + if present && mapping.scale != m.Scale { + t.Fatalf("%d.%q: Expected scale %v, got %v", i, metric, mapping.scale, m.Scale) + } } }) } diff --git a/pkg/mapper/mapping.go b/pkg/mapper/mapping.go index f3aa585..dda6423 100644 --- a/pkg/mapper/mapping.go +++ b/pkg/mapper/mapping.go @@ -41,6 +41,7 @@ type MetricMapping struct { Ttl time.Duration `yaml:"ttl"` SummaryOptions *SummaryOptions `yaml:"summary_options"` HistogramOptions *HistogramOptions `yaml:"histogram_options"` + Scale MaybeFloat64 `yaml:"scale"` } // UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys @@ -66,6 +67,7 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error { m.Ttl = tmp.Ttl m.SummaryOptions = tmp.SummaryOptions m.HistogramOptions = tmp.HistogramOptions + m.Scale = tmp.Scale // Use deprecated TimerType if necessary if tmp.ObserverType == "" { @@ -74,3 +76,25 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } + +type MaybeFloat64 struct { + Set bool + Val float64 +} + +func (m *MaybeFloat64) MarshalYAML() (interface{}, error) { + if m.Set { + return m.Val, nil + } + return nil, nil +} + +func (m *MaybeFloat64) UnmarshalYAML(unmarshal func(interface{}) error) error { + var tmp float64 + if err := unmarshal(&tmp); err != nil { + return err + } + m.Val = tmp + m.Set = true + return nil +}