Implement config watching and reloading.

This commit is contained in:
Julius Volz 2013-07-08 20:37:12 +02:00
parent fe0359ec09
commit eff54bdcf9
5 changed files with 97 additions and 28 deletions

View file

@ -298,14 +298,3 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
} }
e <- events e <- events
} }
var (
eventStats = prometheus.NewCounter()
networkStats = prometheus.NewCounter()
)
func init() {
prometheus.Register("statsd_bridge_events_total", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats)
prometheus.Register("statsd_bridge_packets_total", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats)
}

43
main.go
View file

@ -14,6 +14,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/howeyc/fsnotify"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/exp" "github.com/prometheus/client_golang/prometheus/exp"
) )
@ -21,7 +22,7 @@ import (
var ( var (
listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.")
statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.") statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.")
mappingConfig = flag.String("mappingConfig", "mapping.conf", "Metric mapping configuration file name.") mappingConfig = flag.String("mappingConfig", "", "Metric mapping configuration file name.")
summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.") summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.")
) )
@ -56,6 +57,39 @@ func udpAddrFromString(addr string) *net.UDPAddr {
} }
} }
func watchConfig(fileName string, mapper *metricMapper) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
err = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY)
if err != nil {
log.Fatal(err)
}
for {
select {
case ev := <-watcher.Event:
log.Printf("Config file changed (%s), attempting reload", ev)
err = mapper.initFromFile(fileName)
if err != nil {
log.Println("Error reloading config:", err)
configLoads.Increment(map[string]string{"outcome": "failure"})
} else {
log.Println("Config reloaded successfully")
configLoads.Increment(map[string]string{"outcome": "success"})
}
// Re-add the file watcher since it can get lost on some changes. E.g.
// saving a file with vim results in a RENAME-MODIFY-DELETE event
// sequence, after which the newly written file is no longer watched.
err = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY)
case err := <-watcher.Error:
log.Println("Error watching config:", err)
}
}
}
func main() { func main() {
flag.Parse() flag.Parse()
@ -76,14 +110,15 @@ func main() {
l := &StatsDListener{conn: conn} l := &StatsDListener{conn: conn}
go l.Listen(events) go l.Listen(events)
mapper := metricMapper{} mapper := &metricMapper{}
if mappingConfig != nil { if *mappingConfig != "" {
err := mapper.initFromFile(*mappingConfig) err := mapper.initFromFile(*mappingConfig)
if err != nil { if err != nil {
log.Fatal("Error loading config:", err) log.Fatal("Error loading config:", err)
} }
go watchConfig(*mappingConfig, mapper)
} }
bridge := NewBridge(&mapper) bridge := NewBridge(mapper)
go func() { go func() {
for _ = range time.Tick(*summaryFlushInterval) { for _ = range time.Tick(*summaryFlushInterval) {
bridge.Summaries.Flush() bridge.Summaries.Flush()

View file

@ -11,6 +11,9 @@ import (
"io/ioutil" "io/ioutil"
"regexp" "regexp"
"strings" "strings"
"sync"
"github.com/prometheus/client_golang/prometheus"
) )
var ( var (
@ -26,6 +29,7 @@ type metricMapping struct {
type metricMapper struct { type metricMapper struct {
mappings []metricMapping mappings []metricMapping
mutex sync.Mutex
} }
type configLoadStates int type configLoadStates int
@ -35,11 +39,12 @@ const (
METRIC_DEFINITION METRIC_DEFINITION
) )
func (l *metricMapper) initFromString(fileContents string) error { func (m *metricMapper) initFromString(fileContents string) error {
lines := strings.Split(fileContents, "\n") lines := strings.Split(fileContents, "\n")
state := SEARCHING state := SEARCHING
mapping := metricMapping{labels: map[string]string{}} parsedMappings := []metricMapping{}
currentMapping := metricMapping{labels: map[string]string{}}
for i, line := range lines { for i, line := range lines {
line := strings.TrimSpace(line) line := strings.TrimSpace(line)
@ -51,24 +56,28 @@ func (l *metricMapper) initFromString(fileContents string) error {
if !metricLineRE.MatchString(line) { if !metricLineRE.MatchString(line) {
return fmt.Errorf("Line %d: expected metric match line, got: %s", i, line) return fmt.Errorf("Line %d: expected metric match line, got: %s", i, line)
} }
// Translate the glob-style metric match line into a proper regex that we
// can use to match metrics later on.
metricRe := strings.Replace(line, ".", "\\.", -1) metricRe := strings.Replace(line, ".", "\\.", -1)
metricRe = strings.Replace(metricRe, "*", "([^.]+)", -1) metricRe = strings.Replace(metricRe, "*", "([^.]+)", -1)
mapping.regex = regexp.MustCompile("^" + metricRe + "$") currentMapping.regex = regexp.MustCompile("^" + metricRe + "$")
state = METRIC_DEFINITION state = METRIC_DEFINITION
case METRIC_DEFINITION: case METRIC_DEFINITION:
if line == "" { if line == "" {
if len(mapping.labels) == 0 { if len(currentMapping.labels) == 0 {
return fmt.Errorf("Line %d: metric mapping didn't set any labels", i) return fmt.Errorf("Line %d: metric mapping didn't set any labels", i)
} }
if _, ok := mapping.labels["name"]; !ok { if _, ok := currentMapping.labels["name"]; !ok {
return fmt.Errorf("Line %d: metric mapping didn't set a metric name", i) return fmt.Errorf("Line %d: metric mapping didn't set a metric name", i)
} }
l.mappings = append(l.mappings, mapping) parsedMappings = append(parsedMappings, currentMapping)
state = SEARCHING state = SEARCHING
mapping = metricMapping{labels: map[string]string{}} currentMapping = metricMapping{labels: map[string]string{}}
continue continue
} }
@ -76,24 +85,34 @@ func (l *metricMapper) initFromString(fileContents string) error {
if len(matches) != 3 { if len(matches) != 3 {
return fmt.Errorf("Line %d: expected label mapping line, got: %s", i, line) return fmt.Errorf("Line %d: expected label mapping line, got: %s", i, line)
} }
mapping.labels[matches[1]] = matches[2] currentMapping.labels[matches[1]] = matches[2]
default: default:
panic("illegal state") panic("illegal state")
} }
} }
m.mutex.Lock()
defer m.mutex.Unlock()
m.mappings = parsedMappings
mappingsCount.Set(prometheus.NilLabels, float64(len(parsedMappings)))
return nil return nil
} }
func (l *metricMapper) initFromFile(fileName string) error { func (m *metricMapper) initFromFile(fileName string) error {
mappingStr, err := ioutil.ReadFile(fileName) mappingStr, err := ioutil.ReadFile(fileName)
if err != nil { if err != nil {
return err return err
} }
return l.initFromString(string(mappingStr)) return m.initFromString(string(mappingStr))
} }
func (l *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) {
for _, mapping := range l.mappings { m.mutex.Lock()
defer m.mutex.Unlock()
for _, mapping := range m.mappings {
matches := mapping.regex.FindStringSubmatchIndex(statsdMetric) matches := mapping.regex.FindStringSubmatchIndex(statsdMetric)
if len(matches) == 0 { if len(matches) == 0 {
continue continue

View file

@ -71,8 +71,8 @@ func TestMetricMapper(t *testing.T) {
}, },
} }
for i, scenario := range scenarios {
mapper := metricMapper{} mapper := metricMapper{}
for i, scenario := range scenarios {
err := mapper.initFromString(scenario.config) err := mapper.initFromString(scenario.config)
if err != nil && !scenario.configBad { if err != nil && !scenario.configBad {
t.Fatalf("%d. Config load error: %s", i, err) t.Fatalf("%d. Config load error: %s", i, err)

26
telemetry.go Normal file
View file

@ -0,0 +1,26 @@
// Copyright (c) 2013, Prometheus Team
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
eventStats = prometheus.NewCounter()
networkStats = prometheus.NewCounter()
configLoads = prometheus.NewCounter()
mappingsCount = prometheus.NewGauge()
)
func init() {
prometheus.Register("statsd_bridge_events_count", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats)
prometheus.Register("statsd_bridge_packets_count", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats)
prometheus.Register("statsd_bridge_config_reloads_count", "The number of configuration reloads.", prometheus.NilLabels, configLoads)
prometheus.Register("statsd_bridge_loaded_mappings_count", "The number of configured metric mappings.", prometheus.NilLabels, mappingsCount)
}