mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-12-29 17:00:28 +00:00
Merge pull request #499 from ebracho/master
Add `scale` field to mapping config
This commit is contained in:
commit
80e119a781
5 changed files with 162 additions and 7 deletions
17
README.md
17
README.md
|
@ -577,6 +577,23 @@ metrics that do not expire.
|
|||
expire a metric only by changing the mapping configuration. At least one
|
||||
sample must be received for updated mappings to take effect.
|
||||
|
||||
### Unit conversions
|
||||
|
||||
The `scale` parameter can be used to define unit conversions for metric values. The value is a floating point number to scale metric values by. This can be useful for converting non-base units (e.g. milliseconds, kilobytes) to base units (e.g. seconds, bytes) as recommended in [prometheus best practices](https://prometheus.io/docs/practices/naming/).
|
||||
|
||||
```yaml
|
||||
mappings:
|
||||
- match: foo.latency_ms
|
||||
name: foo_latency_seconds
|
||||
scale: 0.001
|
||||
- match: bar.processed_kb
|
||||
name: bar_processed_bytes
|
||||
scale: 1024
|
||||
- match: baz.latency_us
|
||||
name: baz_latency_seconds
|
||||
scale: 1e-6
|
||||
```
|
||||
|
||||
### Event flushing configuration
|
||||
|
||||
Internally `statsd_exporter` runs a goroutine for each network listener (UDP, TCP & Unix Socket). These each receive and parse metrics received into an event. For performance purposes, these events are queued internally and flushed to the main exporter goroutine periodically in batches. The size of this queue and the flush criteria can be tuned with the `--statsd.event-queue-size`, `--statsd.event-flush-threshold` and `--statsd.event-flush-interval`. However, the defaults should perform well even for very high traffic environments.
|
||||
|
|
|
@ -113,19 +113,24 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
|
|||
metricName = mapper.EscapeMetricName(thisEvent.MetricName())
|
||||
}
|
||||
|
||||
eventValue := thisEvent.Value()
|
||||
if mapping.Scale.Set {
|
||||
eventValue *= mapping.Scale.Val
|
||||
}
|
||||
|
||||
switch ev := thisEvent.(type) {
|
||||
case *event.CounterEvent:
|
||||
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
||||
// will cause the exporter to panic. Instead we will warn and continue to the next event.
|
||||
if thisEvent.Value() < 0.0 {
|
||||
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value())
|
||||
if eventValue < 0.0 {
|
||||
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", eventValue)
|
||||
b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
|
||||
return
|
||||
}
|
||||
|
||||
counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount)
|
||||
if err == nil {
|
||||
counter.Add(thisEvent.Value())
|
||||
counter.Add(eventValue)
|
||||
b.EventStats.WithLabelValues("counter").Inc()
|
||||
} else {
|
||||
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
|
||||
|
@ -137,9 +142,9 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
|
|||
|
||||
if err == nil {
|
||||
if ev.GRelative {
|
||||
gauge.Add(thisEvent.Value())
|
||||
gauge.Add(eventValue)
|
||||
} else {
|
||||
gauge.Set(thisEvent.Value())
|
||||
gauge.Set(eventValue)
|
||||
}
|
||||
b.EventStats.WithLabelValues("gauge").Inc()
|
||||
} else {
|
||||
|
@ -160,7 +165,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
|
|||
case mapper.ObserverTypeHistogram:
|
||||
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount)
|
||||
if err == nil {
|
||||
histogram.Observe(thisEvent.Value())
|
||||
histogram.Observe(eventValue)
|
||||
b.EventStats.WithLabelValues("observer").Inc()
|
||||
} else {
|
||||
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
|
||||
|
@ -170,7 +175,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
|
|||
case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary:
|
||||
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount)
|
||||
if err == nil {
|
||||
summary.Observe(thisEvent.Value())
|
||||
summary.Observe(eventValue)
|
||||
b.EventStats.WithLabelValues("observer").Inc()
|
||||
} else {
|
||||
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
|
||||
|
|
|
@ -845,6 +845,63 @@ func TestCounterIncrement(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScaledMapping(t *testing.T) {
|
||||
events := make(chan event.Events)
|
||||
testMapper := mapper.MetricMapper{}
|
||||
config := `mappings:
|
||||
- match: foo.processed_kilobytes
|
||||
name: processed_bytes
|
||||
scale: 1024
|
||||
labels:
|
||||
service: foo`
|
||||
err := testMapper.InitFromYAMLString(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Config load error: %s %s", config, err)
|
||||
}
|
||||
|
||||
// Start exporter with a synchronous channel
|
||||
go func() {
|
||||
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
|
||||
ex.Listen(events)
|
||||
}()
|
||||
|
||||
// Synchronously send a statsd event to wait for handleEvent execution.
|
||||
// Then close events channel to stop a listener.
|
||||
statsdName := "foo.processed_kilobytes"
|
||||
statsdLabels := map[string]string{}
|
||||
promName := "processed_bytes"
|
||||
promLabels := map[string]string{"service": "foo"}
|
||||
c := event.Events{
|
||||
&event.CounterEvent{
|
||||
CMetricName: statsdName,
|
||||
CValue: 100,
|
||||
CLabels: statsdLabels,
|
||||
},
|
||||
&event.CounterEvent{
|
||||
CMetricName: statsdName,
|
||||
CValue: 200,
|
||||
CLabels: statsdLabels,
|
||||
},
|
||||
}
|
||||
events <- c
|
||||
// Push empty event so that we block until the first event is consumed.
|
||||
events <- event.Events{}
|
||||
close(events)
|
||||
|
||||
// Check counter value
|
||||
metrics, err := prometheus.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
|
||||
}
|
||||
value := getFloat64(metrics, promName, promLabels)
|
||||
if value == nil {
|
||||
t.Fatal("Counter value should not be nil")
|
||||
}
|
||||
if *value != 300*1024 {
|
||||
t.Fatalf("Counter wasn't incremented properly")
|
||||
}
|
||||
}
|
||||
|
||||
type statsDPacketHandler interface {
|
||||
HandlePacket(packet []byte)
|
||||
SetEventHandler(eh event.EventHandler)
|
||||
|
|
|
@ -35,6 +35,7 @@ type mappings []struct {
|
|||
ageBuckets uint32
|
||||
bufCap uint32
|
||||
buckets []float64
|
||||
scale MaybeFloat64
|
||||
}
|
||||
|
||||
func newTestMapperWithCache(cacheType string, size int) *MetricMapper {
|
||||
|
@ -1480,6 +1481,54 @@ mappings:
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
testName: "Config with 'scale' field",
|
||||
config: `mappings:
|
||||
- match: grpc_server.*.*.latency_ms
|
||||
name: grpc_server_handling_seconds
|
||||
scale: 0.001
|
||||
labels:
|
||||
grpc_service: "$1"
|
||||
grpc_method: "$2"`,
|
||||
mappings: mappings{
|
||||
{
|
||||
statsdMetric: "test.a",
|
||||
},
|
||||
{
|
||||
statsdMetric: "grpc_server.Foo.Bar.latency_ms",
|
||||
name: "grpc_server_handling_seconds",
|
||||
scale: MaybeFloat64{Val: 0.001, Set: true},
|
||||
labels: map[string]string{
|
||||
"grpc_service": "Foo",
|
||||
"grpc_method": "Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
testName: "Config with 'scale' using scientific notation",
|
||||
config: `mappings:
|
||||
- match: grpc_server.*.*.latency_us
|
||||
name: grpc_server_handling_seconds
|
||||
scale: 1e-6
|
||||
labels:
|
||||
grpc_service: "$1"
|
||||
grpc_method: "$2"`,
|
||||
mappings: mappings{
|
||||
{
|
||||
statsdMetric: "test.a",
|
||||
},
|
||||
{
|
||||
statsdMetric: "grpc_server.Foo.Bar.latency_us",
|
||||
name: "grpc_server_handling_seconds",
|
||||
scale: MaybeFloat64{Val: 1e-6, Set: true},
|
||||
labels: map[string]string{
|
||||
"grpc_service": "Foo",
|
||||
"grpc_method": "Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mapper := MetricMapper{}
|
||||
|
@ -1561,6 +1610,9 @@ mappings:
|
|||
if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap {
|
||||
t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap)
|
||||
}
|
||||
if present && mapping.scale != m.Scale {
|
||||
t.Fatalf("%d.%q: Expected scale %v, got %v", i, metric, mapping.scale, m.Scale)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ type MetricMapping struct {
|
|||
Ttl time.Duration `yaml:"ttl"`
|
||||
SummaryOptions *SummaryOptions `yaml:"summary_options"`
|
||||
HistogramOptions *HistogramOptions `yaml:"histogram_options"`
|
||||
Scale MaybeFloat64 `yaml:"scale"`
|
||||
}
|
||||
|
||||
// UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys
|
||||
|
@ -66,6 +67,7 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
m.Ttl = tmp.Ttl
|
||||
m.SummaryOptions = tmp.SummaryOptions
|
||||
m.HistogramOptions = tmp.HistogramOptions
|
||||
m.Scale = tmp.Scale
|
||||
|
||||
// Use deprecated TimerType if necessary
|
||||
if tmp.ObserverType == "" {
|
||||
|
@ -74,3 +76,25 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
type MaybeFloat64 struct {
|
||||
Set bool
|
||||
Val float64
|
||||
}
|
||||
|
||||
func (m *MaybeFloat64) MarshalYAML() (interface{}, error) {
|
||||
if m.Set {
|
||||
return m.Val, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *MaybeFloat64) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
var tmp float64
|
||||
if err := unmarshal(&tmp); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Val = tmp
|
||||
m.Set = true
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue