diff --git a/bridge.go b/bridge.go index b59c3a9..bb86487 100644 --- a/bridge.go +++ b/bridge.go @@ -298,14 +298,3 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { } e <- events } - -var ( - eventStats = prometheus.NewCounter() - networkStats = prometheus.NewCounter() -) - -func init() { - prometheus.Register("statsd_bridge_events_total", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats) - prometheus.Register("statsd_bridge_packets_total", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats) - -} diff --git a/main.go b/main.go index a4448a3..ed6c76a 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "strconv" "time" + "github.com/howeyc/fsnotify" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/exp" ) @@ -21,7 +22,7 @@ import ( var ( listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.") - mappingConfig = flag.String("mappingConfig", "mapping.conf", "Metric mapping configuration file name.") + mappingConfig = flag.String("mappingConfig", "", "Metric mapping configuration file name.") summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.") ) @@ -56,6 +57,39 @@ func udpAddrFromString(addr string) *net.UDPAddr { } } +func watchConfig(fileName string, mapper *metricMapper) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatal(err) + } + + err = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY) + if err != nil { + log.Fatal(err) + } + + for { + select { + case ev := <-watcher.Event: + log.Printf("Config file changed (%s), attempting reload", ev) + err = mapper.initFromFile(fileName) + if err != nil { + log.Println("Error reloading config:", err) + configLoads.Increment(map[string]string{"outcome": "failure"}) + } else { + log.Println("Config reloaded successfully") + configLoads.Increment(map[string]string{"outcome": "success"}) + } + // Re-add the file watcher since it can get lost on some changes. E.g. + // saving a file with vim results in a RENAME-MODIFY-DELETE event + // sequence, after which the newly written file is no longer watched. + err = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY) + case err := <-watcher.Error: + log.Println("Error watching config:", err) + } + } +} + func main() { flag.Parse() @@ -76,14 +110,15 @@ func main() { l := &StatsDListener{conn: conn} go l.Listen(events) - mapper := metricMapper{} - if mappingConfig != nil { + mapper := &metricMapper{} + if *mappingConfig != "" { err := mapper.initFromFile(*mappingConfig) if err != nil { log.Fatal("Error loading config:", err) } + go watchConfig(*mappingConfig, mapper) } - bridge := NewBridge(&mapper) + bridge := NewBridge(mapper) go func() { for _ = range time.Tick(*summaryFlushInterval) { bridge.Summaries.Flush() diff --git a/mapper.go b/mapper.go index c856cc4..7cd374c 100644 --- a/mapper.go +++ b/mapper.go @@ -11,6 +11,9 @@ import ( "io/ioutil" "regexp" "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -26,6 +29,7 @@ type metricMapping struct { type metricMapper struct { mappings []metricMapping + mutex sync.Mutex } type configLoadStates int @@ -35,11 +39,12 @@ const ( METRIC_DEFINITION ) -func (l *metricMapper) initFromString(fileContents string) error { +func (m *metricMapper) initFromString(fileContents string) error { lines := strings.Split(fileContents, "\n") state := SEARCHING - mapping := metricMapping{labels: map[string]string{}} + parsedMappings := []metricMapping{} + currentMapping := metricMapping{labels: map[string]string{}} for i, line := range lines { line := strings.TrimSpace(line) @@ -51,24 +56,28 @@ func (l *metricMapper) initFromString(fileContents string) error { if !metricLineRE.MatchString(line) { return fmt.Errorf("Line %d: expected metric match line, got: %s", i, line) } + + // Translate the glob-style metric match line into a proper regex that we + // can use to match metrics later on. metricRe := strings.Replace(line, ".", "\\.", -1) metricRe = strings.Replace(metricRe, "*", "([^.]+)", -1) - mapping.regex = regexp.MustCompile("^" + metricRe + "$") + currentMapping.regex = regexp.MustCompile("^" + metricRe + "$") + state = METRIC_DEFINITION case METRIC_DEFINITION: if line == "" { - if len(mapping.labels) == 0 { + if len(currentMapping.labels) == 0 { return fmt.Errorf("Line %d: metric mapping didn't set any labels", i) } - if _, ok := mapping.labels["name"]; !ok { + if _, ok := currentMapping.labels["name"]; !ok { return fmt.Errorf("Line %d: metric mapping didn't set a metric name", i) } - l.mappings = append(l.mappings, mapping) + parsedMappings = append(parsedMappings, currentMapping) state = SEARCHING - mapping = metricMapping{labels: map[string]string{}} + currentMapping = metricMapping{labels: map[string]string{}} continue } @@ -76,24 +85,34 @@ func (l *metricMapper) initFromString(fileContents string) error { if len(matches) != 3 { return fmt.Errorf("Line %d: expected label mapping line, got: %s", i, line) } - mapping.labels[matches[1]] = matches[2] + currentMapping.labels[matches[1]] = matches[2] default: panic("illegal state") } } + + m.mutex.Lock() + defer m.mutex.Unlock() + m.mappings = parsedMappings + + mappingsCount.Set(prometheus.NilLabels, float64(len(parsedMappings))) + return nil } -func (l *metricMapper) initFromFile(fileName string) error { +func (m *metricMapper) initFromFile(fileName string) error { mappingStr, err := ioutil.ReadFile(fileName) if err != nil { return err } - return l.initFromString(string(mappingStr)) + return m.initFromString(string(mappingStr)) } -func (l *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { - for _, mapping := range l.mappings { +func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, mapping := range m.mappings { matches := mapping.regex.FindStringSubmatchIndex(statsdMetric) if len(matches) == 0 { continue diff --git a/mapper_test.go b/mapper_test.go index eb89a2e..87ca71a 100644 --- a/mapper_test.go +++ b/mapper_test.go @@ -71,8 +71,8 @@ func TestMetricMapper(t *testing.T) { }, } + mapper := metricMapper{} for i, scenario := range scenarios { - mapper := metricMapper{} err := mapper.initFromString(scenario.config) if err != nil && !scenario.configBad { t.Fatalf("%d. Config load error: %s", i, err) diff --git a/telemetry.go b/telemetry.go new file mode 100644 index 0000000..6c7bb3d --- /dev/null +++ b/telemetry.go @@ -0,0 +1,26 @@ +// Copyright (c) 2013, Prometheus Team +// All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + eventStats = prometheus.NewCounter() + networkStats = prometheus.NewCounter() + configLoads = prometheus.NewCounter() + mappingsCount = prometheus.NewGauge() +) + +func init() { + prometheus.Register("statsd_bridge_events_count", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats) + prometheus.Register("statsd_bridge_packets_count", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats) + prometheus.Register("statsd_bridge_config_reloads_count", "The number of configuration reloads.", prometheus.NilLabels, configLoads) + prometheus.Register("statsd_bridge_loaded_mappings_count", "The number of configured metric mappings.", prometheus.NilLabels, mappingsCount) + +}