diff --git a/README.md b/README.md index b1fc71a..2285594 100644 --- a/README.md +++ b/README.md @@ -577,6 +577,23 @@ metrics that do not expire. expire a metric only by changing the mapping configuration. At least one sample must be received for updated mappings to take effect. +### Unit conversions + +The `scale` parameter can be used to define unit conversions for metric values. The value is a floating point number to scale metric values by. This can be useful for converting non-base units (e.g. milliseconds, kilobytes) to base units (e.g. seconds, bytes) as recommended in [prometheus best practices](https://prometheus.io/docs/practices/naming/). + +```yaml +mappings: +- match: foo.latency_ms + name: foo_latency_seconds + scale: 0.001 +- match: bar.processed_kb + name: bar_processed_bytes + scale: 1024 +- match: baz.latency_us + name: baz_latency_seconds + scale: 1e-6 +``` + ### Event flushing configuration Internally `statsd_exporter` runs a goroutine for each network listener (UDP, TCP & Unix Socket). These each receive and parse metrics received into an event. For performance purposes, these events are queued internally and flushed to the main exporter goroutine periodically in batches. The size of this queue and the flush criteria can be tuned with the `--statsd.event-queue-size`, `--statsd.event-flush-threshold` and `--statsd.event-flush-interval`. However, the defaults should perform well even for very high traffic environments. diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index c61ab66..4bbed4d 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -113,19 +113,24 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { metricName = mapper.EscapeMetricName(thisEvent.MetricName()) } + eventValue := thisEvent.Value() + if mapping.Scale.Set { + eventValue *= mapping.Scale.Val + } + switch ev := thisEvent.(type) { case *event.CounterEvent: // We don't accept negative values for counters. Incrementing the counter with a negative number // will cause the exporter to panic. Instead we will warn and continue to the next event. - if thisEvent.Value() < 0.0 { - level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) + if eventValue < 0.0 { + level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", eventValue) b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc() return } counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - counter.Add(thisEvent.Value()) + counter.Add(eventValue) b.EventStats.WithLabelValues("counter").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) @@ -137,9 +142,9 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { if err == nil { if ev.GRelative { - gauge.Add(thisEvent.Value()) + gauge.Add(eventValue) } else { - gauge.Set(thisEvent.Value()) + gauge.Set(eventValue) } b.EventStats.WithLabelValues("gauge").Inc() } else { @@ -160,7 +165,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { case mapper.ObserverTypeHistogram: histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - histogram.Observe(thisEvent.Value()) + histogram.Observe(eventValue) b.EventStats.WithLabelValues("observer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) @@ -170,7 +175,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) { case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary: summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount) if err == nil { - summary.Observe(thisEvent.Value()) + summary.Observe(eventValue) b.EventStats.WithLabelValues("observer").Inc() } else { level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index 6e1744c..a0bdce4 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -845,6 +845,63 @@ func TestCounterIncrement(t *testing.T) { } } +func TestScaledMapping(t *testing.T) { + events := make(chan event.Events) + testMapper := mapper.MetricMapper{} + config := `mappings: +- match: foo.processed_kilobytes + name: processed_bytes + scale: 1024 + labels: + service: foo` + err := testMapper.InitFromYAMLString(config) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + + // Start exporter with a synchronous channel + go func() { + ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) + }() + + // Synchronously send a statsd event to wait for handleEvent execution. + // Then close events channel to stop a listener. + statsdName := "foo.processed_kilobytes" + statsdLabels := map[string]string{} + promName := "processed_bytes" + promLabels := map[string]string{"service": "foo"} + c := event.Events{ + &event.CounterEvent{ + CMetricName: statsdName, + CValue: 100, + CLabels: statsdLabels, + }, + &event.CounterEvent{ + CMetricName: statsdName, + CValue: 200, + CLabels: statsdLabels, + }, + } + events <- c + // Push empty event so that we block until the first event is consumed. + events <- event.Events{} + close(events) + + // Check counter value + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatalf("Cannot gather from DefaultGatherer: %v", err) + } + value := getFloat64(metrics, promName, promLabels) + if value == nil { + t.Fatal("Counter value should not be nil") + } + if *value != 300*1024 { + t.Fatalf("Counter wasn't incremented properly") + } +} + type statsDPacketHandler interface { HandlePacket(packet []byte) SetEventHandler(eh event.EventHandler) diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index f387ea8..eab7439 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -35,6 +35,7 @@ type mappings []struct { ageBuckets uint32 bufCap uint32 buckets []float64 + scale MaybeFloat64 } func newTestMapperWithCache(cacheType string, size int) *MetricMapper { @@ -1480,6 +1481,54 @@ mappings: }, }, }, + { + testName: "Config with 'scale' field", + config: `mappings: +- match: grpc_server.*.*.latency_ms + name: grpc_server_handling_seconds + scale: 0.001 + labels: + grpc_service: "$1" + grpc_method: "$2"`, + mappings: mappings{ + { + statsdMetric: "test.a", + }, + { + statsdMetric: "grpc_server.Foo.Bar.latency_ms", + name: "grpc_server_handling_seconds", + scale: MaybeFloat64{Val: 0.001, Set: true}, + labels: map[string]string{ + "grpc_service": "Foo", + "grpc_method": "Bar", + }, + }, + }, + }, + { + testName: "Config with 'scale' using scientific notation", + config: `mappings: +- match: grpc_server.*.*.latency_us + name: grpc_server_handling_seconds + scale: 1e-6 + labels: + grpc_service: "$1" + grpc_method: "$2"`, + mappings: mappings{ + { + statsdMetric: "test.a", + }, + { + statsdMetric: "grpc_server.Foo.Bar.latency_us", + name: "grpc_server_handling_seconds", + scale: MaybeFloat64{Val: 1e-6, Set: true}, + labels: map[string]string{ + "grpc_service": "Foo", + "grpc_method": "Bar", + }, + }, + }, + }, } mapper := MetricMapper{} @@ -1561,6 +1610,9 @@ mappings: if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap { t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap) } + if present && mapping.scale != m.Scale { + t.Fatalf("%d.%q: Expected scale %v, got %v", i, metric, mapping.scale, m.Scale) + } } }) } diff --git a/pkg/mapper/mapping.go b/pkg/mapper/mapping.go index f3aa585..dda6423 100644 --- a/pkg/mapper/mapping.go +++ b/pkg/mapper/mapping.go @@ -41,6 +41,7 @@ type MetricMapping struct { Ttl time.Duration `yaml:"ttl"` SummaryOptions *SummaryOptions `yaml:"summary_options"` HistogramOptions *HistogramOptions `yaml:"histogram_options"` + Scale MaybeFloat64 `yaml:"scale"` } // UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys @@ -66,6 +67,7 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error { m.Ttl = tmp.Ttl m.SummaryOptions = tmp.SummaryOptions m.HistogramOptions = tmp.HistogramOptions + m.Scale = tmp.Scale // Use deprecated TimerType if necessary if tmp.ObserverType == "" { @@ -74,3 +76,25 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } + +type MaybeFloat64 struct { + Set bool + Val float64 +} + +func (m *MaybeFloat64) MarshalYAML() (interface{}, error) { + if m.Set { + return m.Val, nil + } + return nil, nil +} + +func (m *MaybeFloat64) UnmarshalYAML(unmarshal func(interface{}) error) error { + var tmp float64 + if err := unmarshal(&tmp); err != nil { + return err + } + m.Val = tmp + m.Set = true + return nil +}