From ab7c27ee773274081fe1b12e1389207a7777d82f Mon Sep 17 00:00:00 2001 From: Peter Woodman Date: Tue, 4 Nov 2014 11:44:59 +0000 Subject: [PATCH 1/3] Support datadog extensions to statsd datagrams Datadog's format extensions allow attaching labels and tags to statsd metrics, and are documented at http://docs.datadoghq.com/guides/dogstatsd/#datagram-format Additionally changes compare method in tests to reflect.DeepEqual, as the tag maps don't sort deterministically, corrects an inversion in displaying test failure cases, and treats an invalid sample value as 1 instead of throwing out the sample. --- bridge_test.go | 45 ++++++++++++++++++++-- exporter.go | 100 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 106 insertions(+), 39 deletions(-) diff --git a/bridge_test.go b/bridge_test.go index 21e283f..8864a76 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -14,7 +14,7 @@ package main import ( - "fmt" + "reflect" "testing" ) @@ -33,6 +33,7 @@ func TestHandlePacket(t *testing.T) { &CounterEvent{ metricName: "foo", value: 2, + labels: map[string]string{}, }, }, }, { @@ -42,6 +43,7 @@ func TestHandlePacket(t *testing.T) { &GaugeEvent{ metricName: "foo", value: 3, + labels: map[string]string{}, }, }, }, { @@ -51,6 +53,27 @@ func TestHandlePacket(t *testing.T) { &TimerEvent{ metricName: "foo", value: 200, + labels: map[string]string{}, + }, + }, + }, { + name: "datadog tag extension", + in: "foo:100|c|#tag1:bar,tag2:baz,tag3,tag4", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."}, + }, + }, + }, { + name: "datadog tag extension with sampling", + in: "foo:100|c|@0.1|#tag1:bar,tag2,tag3:baz", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 1000, + labels: map[string]string{"tag1": "bar", "tag2": ".", "tag3": "baz"}, }, }, }, { @@ -60,26 +83,32 @@ func TestHandlePacket(t *testing.T) { &TimerEvent{ metricName: "foo", value: 200, + labels: map[string]string{}, }, &TimerEvent{ metricName: "foo", value: 300, + labels: map[string]string{}, }, &CounterEvent{ metricName: "foo", value: 50, + labels: map[string]string{}, }, &GaugeEvent{ metricName: "foo", value: 6, + labels: map[string]string{}, }, &CounterEvent{ metricName: "bar", value: 1, + labels: map[string]string{}, }, &TimerEvent{ metricName: "bar", value: 5, + labels: map[string]string{}, }, }, }, { @@ -94,6 +123,13 @@ func TestHandlePacket(t *testing.T) { }, { name: "illegal sampling factor", in: "foo:1|c|@bar", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 1, + labels: map[string]string{}, + }, + }, }, { name: "zero sampling factor", in: "foo:2|c|@0", @@ -101,6 +137,7 @@ func TestHandlePacket(t *testing.T) { &CounterEvent{ metricName: "foo", value: 2, + labels: map[string]string{}, }, }, }, { @@ -121,12 +158,12 @@ func TestHandlePacket(t *testing.T) { } if len(actual) != len(scenario.out) { - t.Fatalf("%d. Expected %d events, got %d", i, len(scenario.out), len(actual)) + t.Fatalf("%d. Expected %d events, got %d in scenario '%s'", i, len(scenario.out), len(actual), scenario.name) } for j, expected := range scenario.out { - if fmt.Sprintf("%v", actual[j]) != fmt.Sprintf("%v", expected) { - t.Fatalf("%d.%d. Expected %v, got %v", i, j, actual[j], expected) + if !reflect.DeepEqual(&expected, &actual[j]) { + t.Fatalf("%d.%d. Expected %#v, got %#v in scenario '%s'", i, j, expected, actual[j], scenario.name) } } } diff --git a/exporter.go b/exporter.go index 4283d75..5ed24ca 100644 --- a/exporter.go +++ b/exporter.go @@ -143,31 +143,38 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels) prom type Event interface { MetricName() string Value() float64 + Labels() map[string]string } type CounterEvent struct { metricName string value float64 + labels map[string]string } -func (c *CounterEvent) MetricName() string { return c.metricName } -func (c *CounterEvent) Value() float64 { return c.value } +func (c *CounterEvent) MetricName() string { return c.metricName } +func (c *CounterEvent) Value() float64 { return c.value } +func (c *CounterEvent) Labels() map[string]string { return c.labels } type GaugeEvent struct { metricName string value float64 + labels map[string]string } -func (g *GaugeEvent) MetricName() string { return g.metricName } -func (g *GaugeEvent) Value() float64 { return g.value } +func (g *GaugeEvent) MetricName() string { return g.metricName } +func (g *GaugeEvent) Value() float64 { return g.value } +func (c *GaugeEvent) Labels() map[string]string { return c.labels } type TimerEvent struct { metricName string value float64 + labels map[string]string } -func (t *TimerEvent) MetricName() string { return t.metricName } -func (t *TimerEvent) Value() float64 { return t.value } +func (t *TimerEvent) MetricName() string { return t.metricName } +func (t *TimerEvent) Value() float64 { return t.value } +func (c *TimerEvent) Labels() map[string]string { return c.labels } type Events []Event @@ -194,7 +201,7 @@ func (b *Exporter) Listen(e <-chan Events) { events := <-e for _, event := range events { metricName := "" - prometheusLabels := prometheus.Labels{} + prometheusLabels := event.Labels() labels, present := b.mapper.getMapping(event.MetricName()) if present { @@ -257,22 +264,25 @@ type StatsDListener struct { conn *net.UDPConn } -func buildEvent(statType, metric string, value float64) (Event, error) { +func buildEvent(statType, metric string, value float64, labels map[string]string) (Event, error) { switch statType { case "c": return &CounterEvent{ metricName: metric, value: float64(value), + labels: labels, }, nil case "g": return &GaugeEvent{ metricName: metric, value: float64(value), + labels: labels, }, nil - case "ms": + case "ms", "h": return &TimerEvent{ metricName: metric, value: float64(value), + labels: labels, }, nil case "s": return nil, fmt.Errorf("No support for StatsD sets") @@ -301,23 +311,30 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { continue } - elements := strings.Split(line, ":") + elements := strings.SplitN(line, ":", 2) if len(elements) < 2 { networkStats.WithLabelValues("malformed_line").Inc() log.Println("Bad line from StatsD:", line) continue } metric := elements[0] - samples := elements[1:] + var samples []string + if strings.Contains(elements[1], "|#") { + // using datadog extensions, disable multi-metrics + samples = elements[1:] + } else { + samples = strings.Split(elements[1], ":") + } for _, sample := range samples { components := strings.Split(sample, "|") samplingFactor := 1.0 - if len(components) < 2 || len(components) > 3 { + if len(components) < 2 || len(components) > 4 { networkStats.WithLabelValues("malformed_component").Inc() log.Println("Bad component on line:", line) continue } valueStr, statType := components[0], components[1] + labels := map[string]string{} value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Printf("Bad value %s on line: %s", valueStr, line) @@ -325,32 +342,45 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { continue } - if len(components) == 3 { - if statType != "c" { - log.Println("Illegal sampling factor for non-counter metric on line", line) - networkStats.WithLabelValues("illegal_sample_factor").Inc() + if len(components) >= 3 { + for _, component := range components[2:] { + switch component[0] { + case '@': + if statType != "c" { + log.Println("Illegal sampling factor for non-counter metric on line", line) + networkStats.WithLabelValues("illegal_sample_factor").Inc() + } + samplingFactor, err = strconv.ParseFloat(component[1:], 64) + if err != nil { + log.Printf("Invalid sampling factor %s on line %s", component[1:], line) + networkStats.WithLabelValues("invalid_sample_factor").Inc() + } + if samplingFactor == 0 { + samplingFactor = 1 + } + value /= samplingFactor + case '#': + networkStats.WithLabelValues("dogstasd_tags").Inc() + tags := strings.Split(component[1:], ",") + for _, t := range tags { + kv := strings.Split(t, ":") + if len(kv) == 2 { + if len(kv[1]) > 0 { + labels[kv[0]] = kv[1] + } + } else if len(kv) == 1 { + labels[kv[0]] = "." + } + } + default: + log.Printf("Invalid sampling factor or tag section %s on line %s", components[2], line) + networkStats.WithLabelValues("invalid_sample_factor").Inc() + continue + } } - samplingStr := components[2] - if samplingStr[0] != '@' { - log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() - continue - } - samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64) - if err != nil { - log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() - continue - } - if samplingFactor == 0 { - // This should never happen, but avoid division by zero if it does. - log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line) - samplingFactor = 1 - } - value /= samplingFactor } - event, err := buildEvent(statType, metric, value) + event, err := buildEvent(statType, metric, value, labels) if err != nil { log.Printf("Error building event on line %s: %s", line, err) networkStats.WithLabelValues("illegal_event").Inc() From 8f36baf045f9c158431570a99975c10a66747a09 Mon Sep 17 00:00:00 2001 From: Daniel Bonkowski Date: Mon, 7 Mar 2016 15:33:41 +0100 Subject: [PATCH 2/3] sanitize tag keys - add tests for edge cases - fix typo --- bridge_test.go | 20 ++++++++++++++++++++ exporter.go | 16 ++++++++++++---- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/bridge_test.go b/bridge_test.go index 8864a76..4f180a2 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -66,6 +66,26 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."}, }, }, + }, { + name: "datadog tag extension with # in all keys (as sent by datadog php client)", + in: "foo:100|c|#tag1:bar,#tag2:baz,#tag3,#tag4", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."}, + }, + }, + }, { + name: "datadog tag extension with tags unsupported by prometheus", + in: "foo:100|c|#09digits:0,tag.with.dots,tag_with_empty_value:", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{"_09digits": "0", "tag_with_dots": ".", "tag_with_empty_value": "."}, + }, + }, }, { name: "datadog tag extension with sampling", in: "foo:100|c|@0.1|#tag1:bar,tag2,tag3:baz", diff --git a/exporter.go b/exporter.go index 5ed24ca..1a0b369 100644 --- a/exporter.go +++ b/exporter.go @@ -360,17 +360,25 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { } value /= samplingFactor case '#': - networkStats.WithLabelValues("dogstasd_tags").Inc() - tags := strings.Split(component[1:], ",") + networkStats.WithLabelValues("dogstatsd_tags").Inc() + tags := strings.Split(component, ",") for _, t := range tags { + t = strings.TrimPrefix(t, "#") kv := strings.Split(t, ":") + tag_key := kv[0] + tag_key = escapeMetricName(tag_key) + + var tag_value string if len(kv) == 2 { if len(kv[1]) > 0 { - labels[kv[0]] = kv[1] + tag_value = kv[1] + } else { + tag_value = "." } } else if len(kv) == 1 { - labels[kv[0]] = "." + tag_value = "." } + labels[tag_key] = tag_value } default: log.Printf("Invalid sampling factor or tag section %s on line %s", components[2], line) From 0b792e0be6e4bd4754647eceacb60507c7e70448 Mon Sep 17 00:00:00 2001 From: Ilya Margolin Date: Sat, 23 Apr 2016 23:50:41 +0200 Subject: [PATCH 3/3] DogStatsD: review changes --- README.md | 10 +++++++++ bridge_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++-------- exporter.go | 42 ++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 90bea40..af1f3d3 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,16 @@ We recommend this only as an intermediate solution and recommend switching to [native Prometheus instrumentation](http://prometheus.io/docs/instrumenting/clientlibs/) in the long term. +### DogStatsD extensions + +The exporter will convert DogStatsD-style tags to prometheus labels. See +[Tags](http://docs.datadoghq.com/guides/dogstatsd/#tags) in the DogStatsD +documentation for the concept description and +[Datagram Format](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) +for specifics. It boils down to appending +`|#tag:value,another_tag:another_value` to the normal StatsD format. Tags +without values (`#some_tag`) are not supported. + ## Building and Running $ go build diff --git a/bridge_test.go b/bridge_test.go index 4f180a2..05cc90a 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -58,42 +58,82 @@ func TestHandlePacket(t *testing.T) { }, }, { name: "datadog tag extension", - in: "foo:100|c|#tag1:bar,tag2:baz,tag3,tag4", + in: "foo:100|c|#tag1:bar,tag2:baz", out: Events{ &CounterEvent{ metricName: "foo", value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."}, + labels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "datadog tag extension with # in all keys (as sent by datadog php client)", - in: "foo:100|c|#tag1:bar,#tag2:baz,#tag3,#tag4", + in: "foo:100|c|#tag1:bar,#tag2:baz", out: Events{ &CounterEvent{ metricName: "foo", value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz", "tag3": ".", "tag4": "."}, + labels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { - name: "datadog tag extension with tags unsupported by prometheus", - in: "foo:100|c|#09digits:0,tag.with.dots,tag_with_empty_value:", + name: "datadog tag extension with tag keys unsupported by prometheus", + in: "foo:100|c|#09digits:0,tag.with.dots:1", out: Events{ &CounterEvent{ metricName: "foo", value: 100, - labels: map[string]string{"_09digits": "0", "tag_with_dots": ".", "tag_with_empty_value": "."}, + labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, + }, + }, + }, { + name: "datadog tag extension with valueless tags: ignored", + in: "foo:100|c|#tag_without_a_value", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{}, + }, + }, + }, { + name: "datadog tag extension with valueless tags (edge case)", + in: "foo:100|c|#tag_without_a_value,tag:value", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{"tag": "value"}, + }, + }, + }, { + name: "datadog tag extension with empty tags (edge case)", + in: "foo:100|c|#tag:value,,", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 100, + labels: map[string]string{"tag": "value"}, }, }, }, { name: "datadog tag extension with sampling", - in: "foo:100|c|@0.1|#tag1:bar,tag2,tag3:baz", + in: "foo:100|c|@0.1|#tag1:bar,#tag2:baz", out: Events{ &CounterEvent{ metricName: "foo", value: 1000, - labels: map[string]string{"tag1": "bar", "tag2": ".", "tag3": "baz"}, + labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + }, + }, + }, { + name: "datadog tag extension with multiple colons", + in: "foo:100|c|@0.1|#tag1:foo:bar", + out: Events{ + &CounterEvent{ + metricName: "foo", + value: 1000, + labels: map[string]string{"tag1": "foo:bar"}, }, }, }, { diff --git a/exporter.go b/exporter.go index 1a0b369..9fd55d8 100644 --- a/exporter.go +++ b/exporter.go @@ -303,6 +303,25 @@ func (l *StatsDListener) Listen(e chan<- Events) { } } +func parseDogStatsDTagsToLabels(component string) map[string]string { + labels := map[string]string{} + networkStats.WithLabelValues("dogstatsd_tags").Inc() + tags := strings.Split(component, ",") + for _, t := range tags { + t = strings.TrimPrefix(t, "#") + kv := strings.SplitN(t, ":", 2) + + if len(kv) < 2 || len(kv[1]) == 0 { + networkStats.WithLabelValues("malformed_dogstatsd_tag").Inc() + log.Printf("Malformed or empty DogStatsD tag %s in component %s", t, component) + continue + } + + labels[escapeMetricName(kv[0])] = kv[1] + } + return labels +} + func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { lines := strings.Split(string(packet), "\n") events := Events{} @@ -334,7 +353,6 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { continue } valueStr, statType := components[0], components[1] - labels := map[string]string{} value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Printf("Bad value %s on line: %s", valueStr, line) @@ -342,6 +360,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { continue } + labels := map[string]string{} if len(components) >= 3 { for _, component := range components[2:] { switch component[0] { @@ -360,26 +379,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { } value /= samplingFactor case '#': - networkStats.WithLabelValues("dogstatsd_tags").Inc() - tags := strings.Split(component, ",") - for _, t := range tags { - t = strings.TrimPrefix(t, "#") - kv := strings.Split(t, ":") - tag_key := kv[0] - tag_key = escapeMetricName(tag_key) - - var tag_value string - if len(kv) == 2 { - if len(kv[1]) > 0 { - tag_value = kv[1] - } else { - tag_value = "." - } - } else if len(kv) == 1 { - tag_value = "." - } - labels[tag_key] = tag_value - } + labels = parseDogStatsDTagsToLabels(component) default: log.Printf("Invalid sampling factor or tag section %s on line %s", components[2], line) networkStats.WithLabelValues("invalid_sample_factor").Inc()