diff --git a/main.go b/main.go index e825fd9..8b46f1b 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( var ( listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") statsdListeningAddress = flag.String("statsdListeningAddress", ":8126", "The UDP address on which to receive statsd metric lines.") + mappingConfig = flag.String("mappingConfig", "mapping.conf", "Metric mapping configuration file name.") summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.") ) @@ -127,6 +128,7 @@ type Bridge struct { Counters *CounterContainer Gauges *GaugeContainer Summaries *SummaryContainer + mapper *metricMapper } func escapeMetricName(metricName string) string { @@ -141,29 +143,43 @@ func (b *Bridge) Listen(e <-chan Events) { for { events := <-e for _, event := range events { - metricName := escapeMetricName(event.MetricName()) + metricName := "" + prometheusLabels := map[string]string{} + + labels, present := b.mapper.getMapping(event.MetricName()) + if present { + metricName = labels["name"] + for label, value := range labels { + if label != "name" { + prometheusLabels[label] = value + } + } + } else { + metricName = escapeMetricName(event.MetricName()) + } + switch event.(type) { case *CounterEvent: counter := b.Counters.Get(metricName + "_counter") - counter.IncrementBy(prometheus.NilLabels, event.Value()) + counter.IncrementBy(prometheusLabels, event.Value()) eventStats.Increment(map[string]string{"type": "counter"}) case *GaugeEvent: gauge := b.Gauges.Get(metricName + "_gauge") - gauge.Set(prometheus.NilLabels, event.Value()) + gauge.Set(prometheusLabels, event.Value()) eventStats.Increment(map[string]string{"type": "gauge"}) case *TimerEvent: summary := b.Summaries.Get(metricName + "_timer") - summary.Add(prometheus.NilLabels, event.Value()) + summary.Add(prometheusLabels, event.Value()) sum := b.Counters.Get(metricName + "_timer_total") - sum.IncrementBy(prometheus.NilLabels, event.Value()) + sum.IncrementBy(prometheusLabels, event.Value()) count := b.Counters.Get(metricName + "_timer_count") - count.Increment(prometheus.NilLabels) + count.Increment(prometheusLabels) eventStats.Increment(map[string]string{"type": "timer"}) @@ -175,11 +191,12 @@ func (b *Bridge) Listen(e <-chan Events) { } } -func NewBridge() *Bridge { +func NewBridge(mapper *metricMapper) *Bridge { return &Bridge{ Counters: NewCounterContainer(), Gauges: NewGaugeContainer(), Summaries: NewSummaryContainer(), + mapper: mapper, } } @@ -344,7 +361,14 @@ func main() { l := &StatsDListener{conn: conn} go l.Listen(events) - bridge := NewBridge() + mapper := metricMapper{} + if mappingConfig != nil { + err := mapper.initFromFile(*mappingConfig) + if err != nil { + log.Fatal("Error loading config:", err) + } + } + bridge := NewBridge(&mapper) go func() { for _ = range time.Tick(*summaryFlushInterval) { bridge.Summaries.Flush() diff --git a/mapper.go b/mapper.go new file mode 100644 index 0000000..c856cc4 --- /dev/null +++ b/mapper.go @@ -0,0 +1,111 @@ +// Copyright (c) 2013, Prometheus Team +// All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "io/ioutil" + "regexp" + "strings" +) + +var ( + identifierRE = `[a-zA-Z_][a-zA-Z0-9_]+` + metricLineRE = regexp.MustCompile(`^(\*\.|` + identifierRE + `\.)+(\*|` + identifierRE + `)$`) + labelLineRE = regexp.MustCompile(`^(` + identifierRE + `)\s*=\s*"(.*)"$`) +) + +type metricMapping struct { + regex *regexp.Regexp + labels map[string]string +} + +type metricMapper struct { + mappings []metricMapping +} + +type configLoadStates int + +const ( + SEARCHING configLoadStates = iota + METRIC_DEFINITION +) + +func (l *metricMapper) initFromString(fileContents string) error { + lines := strings.Split(fileContents, "\n") + state := SEARCHING + + mapping := metricMapping{labels: map[string]string{}} + for i, line := range lines { + line := strings.TrimSpace(line) + + switch state { + case SEARCHING: + if line == "" { + continue + } + if !metricLineRE.MatchString(line) { + return fmt.Errorf("Line %d: expected metric match line, got: %s", i, line) + } + metricRe := strings.Replace(line, ".", "\\.", -1) + metricRe = strings.Replace(metricRe, "*", "([^.]+)", -1) + mapping.regex = regexp.MustCompile("^" + metricRe + "$") + state = METRIC_DEFINITION + + case METRIC_DEFINITION: + if line == "" { + if len(mapping.labels) == 0 { + return fmt.Errorf("Line %d: metric mapping didn't set any labels", i) + } + if _, ok := mapping.labels["name"]; !ok { + return fmt.Errorf("Line %d: metric mapping didn't set a metric name", i) + } + + l.mappings = append(l.mappings, mapping) + + state = SEARCHING + mapping = metricMapping{labels: map[string]string{}} + continue + } + + matches := labelLineRE.FindStringSubmatch(line) + if len(matches) != 3 { + return fmt.Errorf("Line %d: expected label mapping line, got: %s", i, line) + } + mapping.labels[matches[1]] = matches[2] + default: + panic("illegal state") + } + } + return nil +} + +func (l *metricMapper) initFromFile(fileName string) error { + mappingStr, err := ioutil.ReadFile(fileName) + if err != nil { + return err + } + return l.initFromString(string(mappingStr)) +} + +func (l *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { + for _, mapping := range l.mappings { + matches := mapping.regex.FindStringSubmatchIndex(statsdMetric) + if len(matches) == 0 { + continue + } + + labels := map[string]string{} + for label, valueExpr := range mapping.labels { + value := mapping.regex.ExpandString([]byte{}, valueExpr, statsdMetric, matches) + labels[label] = string(value) + } + return labels, true + } + + return nil, false +} diff --git a/mapper_test.go b/mapper_test.go new file mode 100644 index 0000000..eb89a2e --- /dev/null +++ b/mapper_test.go @@ -0,0 +1,99 @@ +// Copyright (c) 2013, Prometheus Team +// All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "testing" +) + +func TestMetricMapper(t *testing.T) { + scenarios := []struct { + config string + configBad bool + mappings map[string]map[string]string + }{ + // Empty config. + {}, + // Config with several mapping definitions. + { + config: ` + test.dispatcher.*.*.* + name="dispatch_events" + processor="$1" + action="$2" + result="$3" + job="test_dispatcher" + + *.* + name="catchall" + first="$1" + second="$2" + third="$3" + job="$1-$2-$3" + `, + mappings: map[string]map[string]string{ + "test.dispatcher.FooProcessor.send.succeeded": map[string]string{ + "name": "dispatch_events", + "processor": "FooProcessor", + "action": "send", + "result": "succeeded", + "job": "test_dispatcher", + }, + "foo.bar": map[string]string{ + "name": "catchall", + "first": "foo", + "second": "bar", + "third": "", + "job": "foo-bar-", + }, + "foo.bar.baz": map[string]string{}, + }, + }, + // Config with bad metric line. + { + config: ` + bad-metric-line.*.* + name="foo" + `, + configBad: true, + }, + // Config with bad label line. + { + config: ` + test.*.* + name=foo + `, + configBad: true, + }, + } + + for i, scenario := range scenarios { + mapper := metricMapper{} + err := mapper.initFromString(scenario.config) + if err != nil && !scenario.configBad { + t.Fatalf("%d. Config load error: %s", i, err) + } + if err == nil && scenario.configBad { + t.Fatalf("%d. Expected bad config, but loaded ok", i) + } + + for metric, mapping := range scenario.mappings { + labels, present := mapper.getMapping(metric) + if len(labels) == 0 && present { + t.Fatalf("%d.%q: Expected metric to not be present", i, metric) + } + if len(labels) != len(mapping) { + t.Fatalf("%d.%q: Expected %d labels, got %d", i, metric, len(mapping), len(labels)) + } + for label, value := range labels { + if mapping[label] != value { + t.Fatalf("%d.%q: Expected labels %v, got %v", i, metric, mapping, labels) + } + } + } + } +}