forked from mirrors/statsd_exporter
Support datadog extensions to statsd datagrams
Datadog's format extensions allow attaching labels and tags to statsd metrics, and are documented at http://docs.datadoghq.com/guides/dogstatsd/#datagram-format Additionally changes compare method in tests to reflect.DeepEqual, as the tag maps don't sort deterministically, corrects an inversion in displaying test failure cases, and treats an invalid sample value as 1 instead of throwing out the sample.
This commit is contained in:
parent
e9eb193ff9
commit
ab7c27ee77
2 changed files with 106 additions and 39 deletions
|
@ -14,7 +14,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@ func TestHandlePacket(t *testing.T) {
|
||||||
&CounterEvent{
|
&CounterEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 2,
|
value: 2,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
|
@ -42,6 +43,7 @@ func TestHandlePacket(t *testing.T) {
|
||||||
&GaugeEvent{
|
&GaugeEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 3,
|
value: 3,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
|
@ -51,6 +53,27 @@ func TestHandlePacket(t *testing.T) {
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 200,
|
value: 200,
|
||||||
|
labels: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "datadog tag extension",
|
||||||
|
in: "foo:100|c|#tag1:bar,tag2:baz,tag3,tag4",
|
||||||
|
out: Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: "foo",
|
||||||
|
value: 100,
|
||||||
|
labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "datadog tag extension with sampling",
|
||||||
|
in: "foo:100|c|@0.1|#tag1:bar,tag2,tag3:baz",
|
||||||
|
out: Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: "foo",
|
||||||
|
value: 1000,
|
||||||
|
labels: map[string]string{"tag1": "bar", "tag2": ".", "tag3": "baz"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
|
@ -60,26 +83,32 @@ func TestHandlePacket(t *testing.T) {
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 200,
|
value: 200,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 300,
|
value: 300,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
&CounterEvent{
|
&CounterEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 50,
|
value: 50,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
&GaugeEvent{
|
&GaugeEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 6,
|
value: 6,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
&CounterEvent{
|
&CounterEvent{
|
||||||
metricName: "bar",
|
metricName: "bar",
|
||||||
value: 1,
|
value: 1,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
&TimerEvent{
|
&TimerEvent{
|
||||||
metricName: "bar",
|
metricName: "bar",
|
||||||
value: 5,
|
value: 5,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
|
@ -94,6 +123,13 @@ func TestHandlePacket(t *testing.T) {
|
||||||
}, {
|
}, {
|
||||||
name: "illegal sampling factor",
|
name: "illegal sampling factor",
|
||||||
in: "foo:1|c|@bar",
|
in: "foo:1|c|@bar",
|
||||||
|
out: Events{
|
||||||
|
&CounterEvent{
|
||||||
|
metricName: "foo",
|
||||||
|
value: 1,
|
||||||
|
labels: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
}, {
|
}, {
|
||||||
name: "zero sampling factor",
|
name: "zero sampling factor",
|
||||||
in: "foo:2|c|@0",
|
in: "foo:2|c|@0",
|
||||||
|
@ -101,6 +137,7 @@ func TestHandlePacket(t *testing.T) {
|
||||||
&CounterEvent{
|
&CounterEvent{
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
value: 2,
|
value: 2,
|
||||||
|
labels: map[string]string{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
|
@ -121,12 +158,12 @@ func TestHandlePacket(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(actual) != len(scenario.out) {
|
if len(actual) != len(scenario.out) {
|
||||||
t.Fatalf("%d. Expected %d events, got %d", i, len(scenario.out), len(actual))
|
t.Fatalf("%d. Expected %d events, got %d in scenario '%s'", i, len(scenario.out), len(actual), scenario.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
for j, expected := range scenario.out {
|
for j, expected := range scenario.out {
|
||||||
if fmt.Sprintf("%v", actual[j]) != fmt.Sprintf("%v", expected) {
|
if !reflect.DeepEqual(&expected, &actual[j]) {
|
||||||
t.Fatalf("%d.%d. Expected %v, got %v", i, j, actual[j], expected)
|
t.Fatalf("%d.%d. Expected %#v, got %#v in scenario '%s'", i, j, expected, actual[j], scenario.name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
100
exporter.go
100
exporter.go
|
@ -143,31 +143,38 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels) prom
|
||||||
type Event interface {
|
type Event interface {
|
||||||
MetricName() string
|
MetricName() string
|
||||||
Value() float64
|
Value() float64
|
||||||
|
Labels() map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
type CounterEvent struct {
|
type CounterEvent struct {
|
||||||
metricName string
|
metricName string
|
||||||
value float64
|
value float64
|
||||||
|
labels map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CounterEvent) MetricName() string { return c.metricName }
|
func (c *CounterEvent) MetricName() string { return c.metricName }
|
||||||
func (c *CounterEvent) Value() float64 { return c.value }
|
func (c *CounterEvent) Value() float64 { return c.value }
|
||||||
|
func (c *CounterEvent) Labels() map[string]string { return c.labels }
|
||||||
|
|
||||||
type GaugeEvent struct {
|
type GaugeEvent struct {
|
||||||
metricName string
|
metricName string
|
||||||
value float64
|
value float64
|
||||||
|
labels map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GaugeEvent) MetricName() string { return g.metricName }
|
func (g *GaugeEvent) MetricName() string { return g.metricName }
|
||||||
func (g *GaugeEvent) Value() float64 { return g.value }
|
func (g *GaugeEvent) Value() float64 { return g.value }
|
||||||
|
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
|
||||||
|
|
||||||
type TimerEvent struct {
|
type TimerEvent struct {
|
||||||
metricName string
|
metricName string
|
||||||
value float64
|
value float64
|
||||||
|
labels map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimerEvent) MetricName() string { return t.metricName }
|
func (t *TimerEvent) MetricName() string { return t.metricName }
|
||||||
func (t *TimerEvent) Value() float64 { return t.value }
|
func (t *TimerEvent) Value() float64 { return t.value }
|
||||||
|
func (c *TimerEvent) Labels() map[string]string { return c.labels }
|
||||||
|
|
||||||
type Events []Event
|
type Events []Event
|
||||||
|
|
||||||
|
@ -194,7 +201,7 @@ func (b *Exporter) Listen(e <-chan Events) {
|
||||||
events := <-e
|
events := <-e
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
metricName := ""
|
metricName := ""
|
||||||
prometheusLabels := prometheus.Labels{}
|
prometheusLabels := event.Labels()
|
||||||
|
|
||||||
labels, present := b.mapper.getMapping(event.MetricName())
|
labels, present := b.mapper.getMapping(event.MetricName())
|
||||||
if present {
|
if present {
|
||||||
|
@ -257,22 +264,25 @@ type StatsDListener struct {
|
||||||
conn *net.UDPConn
|
conn *net.UDPConn
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildEvent(statType, metric string, value float64) (Event, error) {
|
func buildEvent(statType, metric string, value float64, labels map[string]string) (Event, error) {
|
||||||
switch statType {
|
switch statType {
|
||||||
case "c":
|
case "c":
|
||||||
return &CounterEvent{
|
return &CounterEvent{
|
||||||
metricName: metric,
|
metricName: metric,
|
||||||
value: float64(value),
|
value: float64(value),
|
||||||
|
labels: labels,
|
||||||
}, nil
|
}, nil
|
||||||
case "g":
|
case "g":
|
||||||
return &GaugeEvent{
|
return &GaugeEvent{
|
||||||
metricName: metric,
|
metricName: metric,
|
||||||
value: float64(value),
|
value: float64(value),
|
||||||
|
labels: labels,
|
||||||
}, nil
|
}, nil
|
||||||
case "ms":
|
case "ms", "h":
|
||||||
return &TimerEvent{
|
return &TimerEvent{
|
||||||
metricName: metric,
|
metricName: metric,
|
||||||
value: float64(value),
|
value: float64(value),
|
||||||
|
labels: labels,
|
||||||
}, nil
|
}, nil
|
||||||
case "s":
|
case "s":
|
||||||
return nil, fmt.Errorf("No support for StatsD sets")
|
return nil, fmt.Errorf("No support for StatsD sets")
|
||||||
|
@ -301,23 +311,30 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
elements := strings.Split(line, ":")
|
elements := strings.SplitN(line, ":", 2)
|
||||||
if len(elements) < 2 {
|
if len(elements) < 2 {
|
||||||
networkStats.WithLabelValues("malformed_line").Inc()
|
networkStats.WithLabelValues("malformed_line").Inc()
|
||||||
log.Println("Bad line from StatsD:", line)
|
log.Println("Bad line from StatsD:", line)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
metric := elements[0]
|
metric := elements[0]
|
||||||
samples := elements[1:]
|
var samples []string
|
||||||
|
if strings.Contains(elements[1], "|#") {
|
||||||
|
// using datadog extensions, disable multi-metrics
|
||||||
|
samples = elements[1:]
|
||||||
|
} else {
|
||||||
|
samples = strings.Split(elements[1], ":")
|
||||||
|
}
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
components := strings.Split(sample, "|")
|
components := strings.Split(sample, "|")
|
||||||
samplingFactor := 1.0
|
samplingFactor := 1.0
|
||||||
if len(components) < 2 || len(components) > 3 {
|
if len(components) < 2 || len(components) > 4 {
|
||||||
networkStats.WithLabelValues("malformed_component").Inc()
|
networkStats.WithLabelValues("malformed_component").Inc()
|
||||||
log.Println("Bad component on line:", line)
|
log.Println("Bad component on line:", line)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
valueStr, statType := components[0], components[1]
|
valueStr, statType := components[0], components[1]
|
||||||
|
labels := map[string]string{}
|
||||||
value, err := strconv.ParseFloat(valueStr, 64)
|
value, err := strconv.ParseFloat(valueStr, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Bad value %s on line: %s", valueStr, line)
|
log.Printf("Bad value %s on line: %s", valueStr, line)
|
||||||
|
@ -325,32 +342,45 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(components) == 3 {
|
if len(components) >= 3 {
|
||||||
if statType != "c" {
|
for _, component := range components[2:] {
|
||||||
log.Println("Illegal sampling factor for non-counter metric on line", line)
|
switch component[0] {
|
||||||
networkStats.WithLabelValues("illegal_sample_factor").Inc()
|
case '@':
|
||||||
|
if statType != "c" {
|
||||||
|
log.Println("Illegal sampling factor for non-counter metric on line", line)
|
||||||
|
networkStats.WithLabelValues("illegal_sample_factor").Inc()
|
||||||
|
}
|
||||||
|
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Invalid sampling factor %s on line %s", component[1:], line)
|
||||||
|
networkStats.WithLabelValues("invalid_sample_factor").Inc()
|
||||||
|
}
|
||||||
|
if samplingFactor == 0 {
|
||||||
|
samplingFactor = 1
|
||||||
|
}
|
||||||
|
value /= samplingFactor
|
||||||
|
case '#':
|
||||||
|
networkStats.WithLabelValues("dogstasd_tags").Inc()
|
||||||
|
tags := strings.Split(component[1:], ",")
|
||||||
|
for _, t := range tags {
|
||||||
|
kv := strings.Split(t, ":")
|
||||||
|
if len(kv) == 2 {
|
||||||
|
if len(kv[1]) > 0 {
|
||||||
|
labels[kv[0]] = kv[1]
|
||||||
|
}
|
||||||
|
} else if len(kv) == 1 {
|
||||||
|
labels[kv[0]] = "."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Printf("Invalid sampling factor or tag section %s on line %s", components[2], line)
|
||||||
|
networkStats.WithLabelValues("invalid_sample_factor").Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
samplingStr := components[2]
|
|
||||||
if samplingStr[0] != '@' {
|
|
||||||
log.Printf("Invalid sampling factor %s on line %s", samplingStr, line)
|
|
||||||
networkStats.WithLabelValues("invalid_sample_factor").Inc()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Invalid sampling factor %s on line %s", samplingStr, line)
|
|
||||||
networkStats.WithLabelValues("invalid_sample_factor").Inc()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if samplingFactor == 0 {
|
|
||||||
// This should never happen, but avoid division by zero if it does.
|
|
||||||
log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line)
|
|
||||||
samplingFactor = 1
|
|
||||||
}
|
|
||||||
value /= samplingFactor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
event, err := buildEvent(statType, metric, value)
|
event, err := buildEvent(statType, metric, value, labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error building event on line %s: %s", line, err)
|
log.Printf("Error building event on line %s: %s", line, err)
|
||||||
networkStats.WithLabelValues("illegal_event").Inc()
|
networkStats.WithLabelValues("illegal_event").Inc()
|
||||||
|
|
Loading…
Reference in a new issue