Migrate to new client_golang.

This should only be merged once the new client_golang is merged to its
master branch.
This commit is contained in:
Bjoern Rabenstein 2014-06-26 15:56:21 +02:00
parent 728bdc52ae
commit 65e9c49ca8
4 changed files with 140 additions and 79 deletions

153
bridge.go
View file

@ -7,86 +7,132 @@
package main package main
import ( import (
"bytes"
"encoding/binary"
"fmt" "fmt"
"hash/fnv"
"log" "log"
"net" "net"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
const (
defaultHelp = "Metric autogenerated by statsd_bridge."
regErrF = "A change of configuration created inconsistent metrics for " +
"%q. You have to restart the statsd_bridge, and you should " +
"consider the effects on your monitoring setup. Error: %s"
)
var ( var (
illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
hash = fnv.New64a()
strBuf bytes.Buffer // Used for hashing.
intBuf = make([]byte, 8)
) )
// hashNameAndLabels returns a hash value of the provided name string and all
// the label names and values in the provided labels map.
//
// Not safe for concurrent use! (Uses a shared buffer and hasher to save on
// allocations.)
func hashNameAndLabels(name string, labels prometheus.Labels) uint64 {
hash.Reset()
strBuf.Reset()
strBuf.WriteString(name)
hash.Write(strBuf.Bytes())
binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels))
hash.Write(intBuf)
return hash.Sum64()
}
type CounterContainer struct { type CounterContainer struct {
Elements map[string]prometheus.Counter Elements map[uint64]prometheus.Counter
} }
func NewCounterContainer() *CounterContainer { func NewCounterContainer() *CounterContainer {
return &CounterContainer{ return &CounterContainer{
Elements: make(map[string]prometheus.Counter), Elements: make(map[uint64]prometheus.Counter),
} }
} }
func (c *CounterContainer) Get(metricName string) prometheus.Counter { func (c *CounterContainer) Get(metricName string, labels prometheus.Labels) prometheus.Counter {
counter, ok := c.Elements[metricName] hash := hashNameAndLabels(metricName, labels)
counter, ok := c.Elements[hash]
if !ok { if !ok {
counter = prometheus.NewCounter() counter = prometheus.NewCounter(prometheus.CounterOpts{
c.Elements[metricName] = counter Name: metricName,
prometheus.Register(metricName, "", prometheus.NilLabels, counter) Help: defaultHelp,
ConstLabels: labels,
})
c.Elements[hash] = counter
if _, err := prometheus.Register(counter); err != nil {
log.Fatalf(regErrF, metricName, err)
}
} }
return counter return counter
} }
type GaugeContainer struct { type GaugeContainer struct {
Elements map[string]prometheus.Gauge Elements map[uint64]prometheus.Gauge
} }
func NewGaugeContainer() *GaugeContainer { func NewGaugeContainer() *GaugeContainer {
return &GaugeContainer{ return &GaugeContainer{
Elements: make(map[string]prometheus.Gauge), Elements: make(map[uint64]prometheus.Gauge),
} }
} }
func (c *GaugeContainer) Get(metricName string) prometheus.Gauge { func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels) prometheus.Gauge {
gauge, ok := c.Elements[metricName] hash := hashNameAndLabels(metricName, labels)
gauge, ok := c.Elements[hash]
if !ok { if !ok {
gauge = prometheus.NewGauge() gauge = prometheus.NewGauge(prometheus.GaugeOpts{
c.Elements[metricName] = gauge Name: metricName,
prometheus.Register(metricName, "", prometheus.NilLabels, gauge) Help: defaultHelp,
ConstLabels: labels,
})
c.Elements[hash] = gauge
if _, err := prometheus.Register(gauge); err != nil {
log.Fatalf(regErrF, metricName, err)
}
} }
return gauge return gauge
} }
type SummaryContainer struct { type SummaryContainer struct {
Elements map[string]prometheus.Histogram Elements map[uint64]prometheus.Summary
} }
func NewSummaryContainer() *SummaryContainer { func NewSummaryContainer() *SummaryContainer {
return &SummaryContainer{ return &SummaryContainer{
Elements: make(map[string]prometheus.Histogram), Elements: make(map[uint64]prometheus.Summary),
} }
} }
func (c *SummaryContainer) Get(metricName string) prometheus.Histogram { func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels) prometheus.Summary {
summary, ok := c.Elements[metricName] hash := hashNameAndLabels(metricName, labels)
summary, ok := c.Elements[hash]
if !ok { if !ok {
summary = prometheus.NewDefaultHistogram() summary = prometheus.NewSummary(
c.Elements[metricName] = summary prometheus.SummaryOpts{
prometheus.Register(metricName, "", prometheus.NilLabels, summary) Name: metricName,
Help: defaultHelp,
ConstLabels: labels,
})
c.Elements[hash] = summary
if _, err := prometheus.Register(summary); err != nil {
log.Fatalf(regErrF, metricName, err)
}
} }
return summary return summary
} }
func (c *SummaryContainer) Flush() {
for _, summary := range c.Elements {
summary.ResetAll()
}
}
type Event interface { type Event interface {
MetricName() string MetricName() string
Value() float64 Value() float64
@ -141,7 +187,7 @@ func (b *Bridge) Listen(e <-chan Events) {
events := <-e events := <-e
for _, event := range events { for _, event := range events {
metricName := "" metricName := ""
prometheusLabels := map[string]string{} prometheusLabels := prometheus.Labels{}
labels, present := b.mapper.getMapping(event.MetricName()) labels, present := b.mapper.getMapping(event.MetricName())
if present { if present {
@ -157,32 +203,35 @@ func (b *Bridge) Listen(e <-chan Events) {
switch event.(type) { switch event.(type) {
case *CounterEvent: case *CounterEvent:
counter := b.Counters.Get(metricName + "_counter") counter := b.Counters.Get(
counter.IncrementBy(prometheusLabels, event.Value()) metricName+"_counter",
prometheusLabels,
)
counter.Add(event.Value())
eventStats.Increment(map[string]string{"type": "counter"}) eventStats.WithLabelValues("counter").Inc()
case *GaugeEvent: case *GaugeEvent:
gauge := b.Gauges.Get(metricName + "_gauge") gauge := b.Gauges.Get(
gauge.Set(prometheusLabels, event.Value()) metricName+"_gauge",
prometheusLabels,
)
gauge.Set(event.Value())
eventStats.Increment(map[string]string{"type": "gauge"}) eventStats.WithLabelValues("gauge").Inc()
case *TimerEvent: case *TimerEvent:
summary := b.Summaries.Get(metricName + "_timer") summary := b.Summaries.Get(
summary.Add(prometheusLabels, event.Value()) metricName+"_timer",
prometheusLabels,
)
summary.Observe(event.Value())
sum := b.Counters.Get(metricName + "_timer_total") eventStats.WithLabelValues("timer").Inc()
sum.IncrementBy(prometheusLabels, event.Value())
count := b.Counters.Get(metricName + "_timer_count")
count.Increment(prometheusLabels)
eventStats.Increment(map[string]string{"type": "timer"})
default: default:
log.Println("Unsupported event type") log.Println("Unsupported event type")
eventStats.Increment(map[string]string{"type": "illegal"}) eventStats.WithLabelValues("illegal").Inc()
} }
} }
} }
@ -247,7 +296,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
elements := strings.Split(line, ":") elements := strings.Split(line, ":")
if len(elements) < 2 { if len(elements) < 2 {
networkStats.Increment(map[string]string{"type": "malformed_line"}) networkStats.WithLabelValues("malformed_line").Inc()
log.Println("Bad line from StatsD:", line) log.Println("Bad line from StatsD:", line)
continue continue
} }
@ -257,7 +306,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
components := strings.Split(sample, "|") components := strings.Split(sample, "|")
samplingFactor := 1.0 samplingFactor := 1.0
if len(components) < 2 || len(components) > 3 { if len(components) < 2 || len(components) > 3 {
networkStats.Increment(map[string]string{"type": "malformed_component"}) networkStats.WithLabelValues("malformed_component").Inc()
log.Println("Bad component on line:", line) log.Println("Bad component on line:", line)
continue continue
} }
@ -265,25 +314,25 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
value, err := strconv.ParseFloat(valueStr, 64) value, err := strconv.ParseFloat(valueStr, 64)
if err != nil { if err != nil {
log.Printf("Bad value %s on line: %s", valueStr, line) log.Printf("Bad value %s on line: %s", valueStr, line)
networkStats.Increment(map[string]string{"type": "malformed_value"}) networkStats.WithLabelValues("malformed_value").Inc()
continue continue
} }
if len(components) == 3 { if len(components) == 3 {
if statType != "c" { if statType != "c" {
log.Println("Illegal sampling factor for non-counter metric on line", line) log.Println("Illegal sampling factor for non-counter metric on line", line)
networkStats.Increment(map[string]string{"type": "illegal_sample_factor"}) networkStats.WithLabelValues("illegal_sample_factor").Inc()
} }
samplingStr := components[2] samplingStr := components[2]
if samplingStr[0] != '@' { if samplingStr[0] != '@' {
log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) log.Printf("Invalid sampling factor %s on line %s", samplingStr, line)
networkStats.Increment(map[string]string{"type": "invalid_sample_factor"}) networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue continue
} }
samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64) samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64)
if err != nil { if err != nil {
log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) log.Printf("Invalid sampling factor %s on line %s", samplingStr, line)
networkStats.Increment(map[string]string{"type": "invalid_sample_factor"}) networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue continue
} }
if samplingFactor == 0 { if samplingFactor == 0 {
@ -297,11 +346,11 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
event, err := buildEvent(statType, metric, value) event, err := buildEvent(statType, metric, value)
if err != nil { if err != nil {
log.Printf("Error building event on line %s: %s", line, err) log.Printf("Error building event on line %s: %s", line, err)
networkStats.Increment(map[string]string{"type": "illegal_event"}) networkStats.WithLabelValues("illegal_event").Inc()
continue continue
} }
events = append(events, event) events = append(events, event)
networkStats.Increment(map[string]string{"type": "legal"}) networkStats.WithLabelValues("legal").Inc()
} }
} }
e <- events e <- events

16
main.go
View file

@ -12,23 +12,20 @@ import (
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/howeyc/fsnotify" "github.com/howeyc/fsnotify"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/exp"
) )
var ( var (
listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.")
statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.") statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.")
mappingConfig = flag.String("mappingConfig", "", "Metric mapping configuration file name.") mappingConfig = flag.String("mappingConfig", "", "Metric mapping configuration file name.")
summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.")
) )
func serveHTTP() { func serveHTTP() {
exp.Handle(prometheus.ExpositionResource, prometheus.DefaultHandler) http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*listeningAddress, exp.DefaultCoarseMux) http.ListenAndServe(*listeningAddress, nil)
} }
func udpAddrFromString(addr string) *net.UDPAddr { func udpAddrFromString(addr string) *net.UDPAddr {
@ -75,10 +72,10 @@ func watchConfig(fileName string, mapper *metricMapper) {
err = mapper.initFromFile(fileName) err = mapper.initFromFile(fileName)
if err != nil { if err != nil {
log.Println("Error reloading config:", err) log.Println("Error reloading config:", err)
configLoads.Increment(map[string]string{"outcome": "failure"}) configLoads.WithLabelValues("failure").Inc()
} else { } else {
log.Println("Config reloaded successfully") log.Println("Config reloaded successfully")
configLoads.Increment(map[string]string{"outcome": "success"}) configLoads.WithLabelValues("success").Inc()
} }
// Re-add the file watcher since it can get lost on some changes. E.g. // Re-add the file watcher since it can get lost on some changes. E.g.
// saving a file with vim results in a RENAME-MODIFY-DELETE event // saving a file with vim results in a RENAME-MODIFY-DELETE event
@ -119,10 +116,5 @@ func main() {
go watchConfig(*mappingConfig, mapper) go watchConfig(*mappingConfig, mapper)
} }
bridge := NewBridge(mapper) bridge := NewBridge(mapper)
go func() {
for _ = range time.Tick(*summaryFlushInterval) {
bridge.Summaries.Flush()
}
}()
bridge.Listen(events) bridge.Listen(events)
} }

View file

@ -25,7 +25,7 @@ var (
type metricMapping struct { type metricMapping struct {
regex *regexp.Regexp regex *regexp.Regexp
labels map[string]string labels prometheus.Labels
} }
type metricMapper struct { type metricMapper struct {
@ -45,7 +45,7 @@ func (m *metricMapper) initFromString(fileContents string) error {
state := SEARCHING state := SEARCHING
parsedMappings := []metricMapping{} parsedMappings := []metricMapping{}
currentMapping := metricMapping{labels: map[string]string{}} currentMapping := metricMapping{labels: prometheus.Labels{}}
for i, line := range lines { for i, line := range lines {
line := strings.TrimSpace(line) line := strings.TrimSpace(line)
@ -78,7 +78,7 @@ func (m *metricMapper) initFromString(fileContents string) error {
parsedMappings = append(parsedMappings, currentMapping) parsedMappings = append(parsedMappings, currentMapping)
state = SEARCHING state = SEARCHING
currentMapping = metricMapping{labels: map[string]string{}} currentMapping = metricMapping{labels: prometheus.Labels{}}
continue continue
} }
@ -100,7 +100,7 @@ func (m *metricMapper) initFromString(fileContents string) error {
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.mappings = parsedMappings m.mappings = parsedMappings
mappingsCount.Set(prometheus.NilLabels, float64(len(parsedMappings))) mappingsCount.Set(float64(len(parsedMappings)))
return nil return nil
} }
@ -113,7 +113,7 @@ func (m *metricMapper) initFromFile(fileName string) error {
return m.initFromString(string(mappingStr)) return m.initFromString(string(mappingStr))
} }
func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { func (m *metricMapper) getMapping(statsdMetric string) (labels prometheus.Labels, present bool) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
@ -123,7 +123,7 @@ func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string
continue continue
} }
labels := map[string]string{} labels := prometheus.Labels{}
for label, valueExpr := range mapping.labels { for label, valueExpr := range mapping.labels {
value := mapping.regex.ExpandString([]byte{}, valueExpr, statsdMetric, matches) value := mapping.regex.ExpandString([]byte{}, valueExpr, statsdMetric, matches)
labels[label] = string(value) labels[label] = string(value)

View file

@ -11,16 +11,36 @@ import (
) )
var ( var (
eventStats = prometheus.NewCounter() eventStats = prometheus.NewCounterVec(
networkStats = prometheus.NewCounter() prometheus.CounterOpts{
configLoads = prometheus.NewCounter() Name: "statsd_bridge_events_count",
mappingsCount = prometheus.NewGauge() Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
networkStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_bridge_packets_count",
Help: "The total number of StatsD packets seen.",
},
[]string{"type"},
)
configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_bridge_config_reloads_count",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "statsd_bridge_loaded_mappings_count",
Help: "The number of configured metric mappings.",
})
) )
func init() { func init() {
prometheus.Register("statsd_bridge_events_count", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats) prometheus.MustRegister(eventStats)
prometheus.Register("statsd_bridge_packets_count", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats) prometheus.MustRegister(networkStats)
prometheus.Register("statsd_bridge_config_reloads_count", "The number of configuration reloads.", prometheus.NilLabels, configLoads) prometheus.MustRegister(configLoads)
prometheus.Register("statsd_bridge_loaded_mappings_count", "The number of configured metric mappings.", prometheus.NilLabels, mappingsCount) prometheus.MustRegister(mappingsCount)
} }