diff --git a/bridge.go b/bridge.go index 89447d0..bfb61de 100644 --- a/bridge.go +++ b/bridge.go @@ -7,86 +7,132 @@ package main import ( + "bytes" + "encoding/binary" "fmt" + "hash/fnv" "log" "net" "regexp" "strconv" "strings" + "github.com/prometheus/client_golang/model" "github.com/prometheus/client_golang/prometheus" ) +const ( + defaultHelp = "Metric autogenerated by statsd_bridge." + regErrF = "A change of configuration created inconsistent metrics for " + + "%q. You have to restart the statsd_bridge, and you should " + + "consider the effects on your monitoring setup. Error: %s" +) + var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) + + hash = fnv.New64a() + strBuf bytes.Buffer // Used for hashing. + intBuf = make([]byte, 8) ) +// hashNameAndLabels returns a hash value of the provided name string and all +// the label names and values in the provided labels map. +// +// Not safe for concurrent use! (Uses a shared buffer and hasher to save on +// allocations.) +func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { + hash.Reset() + strBuf.Reset() + strBuf.WriteString(name) + hash.Write(strBuf.Bytes()) + binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) + hash.Write(intBuf) + return hash.Sum64() +} + type CounterContainer struct { - Elements map[string]prometheus.Counter + Elements map[uint64]prometheus.Counter } func NewCounterContainer() *CounterContainer { return &CounterContainer{ - Elements: make(map[string]prometheus.Counter), + Elements: make(map[uint64]prometheus.Counter), } } -func (c *CounterContainer) Get(metricName string) prometheus.Counter { - counter, ok := c.Elements[metricName] +func (c *CounterContainer) Get(metricName string, labels prometheus.Labels) prometheus.Counter { + hash := hashNameAndLabels(metricName, labels) + counter, ok := c.Elements[hash] if !ok { - counter = prometheus.NewCounter() - c.Elements[metricName] = counter - prometheus.Register(metricName, "", prometheus.NilLabels, counter) + counter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: metricName, + Help: defaultHelp, + ConstLabels: labels, + }) + c.Elements[hash] = counter + if _, err := prometheus.Register(counter); err != nil { + log.Fatalf(regErrF, metricName, err) + } } return counter } type GaugeContainer struct { - Elements map[string]prometheus.Gauge + Elements map[uint64]prometheus.Gauge } func NewGaugeContainer() *GaugeContainer { return &GaugeContainer{ - Elements: make(map[string]prometheus.Gauge), + Elements: make(map[uint64]prometheus.Gauge), } } -func (c *GaugeContainer) Get(metricName string) prometheus.Gauge { - gauge, ok := c.Elements[metricName] +func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels) prometheus.Gauge { + hash := hashNameAndLabels(metricName, labels) + gauge, ok := c.Elements[hash] if !ok { - gauge = prometheus.NewGauge() - c.Elements[metricName] = gauge - prometheus.Register(metricName, "", prometheus.NilLabels, gauge) + gauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: metricName, + Help: defaultHelp, + ConstLabels: labels, + }) + c.Elements[hash] = gauge + if _, err := prometheus.Register(gauge); err != nil { + log.Fatalf(regErrF, metricName, err) + } } return gauge } type SummaryContainer struct { - Elements map[string]prometheus.Histogram + Elements map[uint64]prometheus.Summary } func NewSummaryContainer() *SummaryContainer { return &SummaryContainer{ - Elements: make(map[string]prometheus.Histogram), + Elements: make(map[uint64]prometheus.Summary), } } -func (c *SummaryContainer) Get(metricName string) prometheus.Histogram { - summary, ok := c.Elements[metricName] +func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels) prometheus.Summary { + hash := hashNameAndLabels(metricName, labels) + summary, ok := c.Elements[hash] if !ok { - summary = prometheus.NewDefaultHistogram() - c.Elements[metricName] = summary - prometheus.Register(metricName, "", prometheus.NilLabels, summary) + summary = prometheus.NewSummary( + prometheus.SummaryOpts{ + Name: metricName, + Help: defaultHelp, + ConstLabels: labels, + }) + c.Elements[hash] = summary + if _, err := prometheus.Register(summary); err != nil { + log.Fatalf(regErrF, metricName, err) + } } return summary } -func (c *SummaryContainer) Flush() { - for _, summary := range c.Elements { - summary.ResetAll() - } -} - type Event interface { MetricName() string Value() float64 @@ -141,7 +187,7 @@ func (b *Bridge) Listen(e <-chan Events) { events := <-e for _, event := range events { metricName := "" - prometheusLabels := map[string]string{} + prometheusLabels := prometheus.Labels{} labels, present := b.mapper.getMapping(event.MetricName()) if present { @@ -157,32 +203,35 @@ func (b *Bridge) Listen(e <-chan Events) { switch event.(type) { case *CounterEvent: - counter := b.Counters.Get(metricName + "_counter") - counter.IncrementBy(prometheusLabels, event.Value()) + counter := b.Counters.Get( + metricName+"_counter", + prometheusLabels, + ) + counter.Add(event.Value()) - eventStats.Increment(map[string]string{"type": "counter"}) + eventStats.WithLabelValues("counter").Inc() case *GaugeEvent: - gauge := b.Gauges.Get(metricName + "_gauge") - gauge.Set(prometheusLabels, event.Value()) + gauge := b.Gauges.Get( + metricName+"_gauge", + prometheusLabels, + ) + gauge.Set(event.Value()) - eventStats.Increment(map[string]string{"type": "gauge"}) + eventStats.WithLabelValues("gauge").Inc() case *TimerEvent: - summary := b.Summaries.Get(metricName + "_timer") - summary.Add(prometheusLabels, event.Value()) + summary := b.Summaries.Get( + metricName+"_timer", + prometheusLabels, + ) + summary.Observe(event.Value()) - sum := b.Counters.Get(metricName + "_timer_total") - sum.IncrementBy(prometheusLabels, event.Value()) - - count := b.Counters.Get(metricName + "_timer_count") - count.Increment(prometheusLabels) - - eventStats.Increment(map[string]string{"type": "timer"}) + eventStats.WithLabelValues("timer").Inc() default: log.Println("Unsupported event type") - eventStats.Increment(map[string]string{"type": "illegal"}) + eventStats.WithLabelValues("illegal").Inc() } } } @@ -247,7 +296,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { elements := strings.Split(line, ":") if len(elements) < 2 { - networkStats.Increment(map[string]string{"type": "malformed_line"}) + networkStats.WithLabelValues("malformed_line").Inc() log.Println("Bad line from StatsD:", line) continue } @@ -257,7 +306,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { components := strings.Split(sample, "|") samplingFactor := 1.0 if len(components) < 2 || len(components) > 3 { - networkStats.Increment(map[string]string{"type": "malformed_component"}) + networkStats.WithLabelValues("malformed_component").Inc() log.Println("Bad component on line:", line) continue } @@ -265,25 +314,25 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Printf("Bad value %s on line: %s", valueStr, line) - networkStats.Increment(map[string]string{"type": "malformed_value"}) + networkStats.WithLabelValues("malformed_value").Inc() continue } if len(components) == 3 { if statType != "c" { log.Println("Illegal sampling factor for non-counter metric on line", line) - networkStats.Increment(map[string]string{"type": "illegal_sample_factor"}) + networkStats.WithLabelValues("illegal_sample_factor").Inc() } samplingStr := components[2] if samplingStr[0] != '@' { log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) - networkStats.Increment(map[string]string{"type": "invalid_sample_factor"}) + networkStats.WithLabelValues("invalid_sample_factor").Inc() continue } samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64) if err != nil { log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) - networkStats.Increment(map[string]string{"type": "invalid_sample_factor"}) + networkStats.WithLabelValues("invalid_sample_factor").Inc() continue } if samplingFactor == 0 { @@ -297,11 +346,11 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { event, err := buildEvent(statType, metric, value) if err != nil { log.Printf("Error building event on line %s: %s", line, err) - networkStats.Increment(map[string]string{"type": "illegal_event"}) + networkStats.WithLabelValues("illegal_event").Inc() continue } events = append(events, event) - networkStats.Increment(map[string]string{"type": "legal"}) + networkStats.WithLabelValues("legal").Inc() } } e <- events diff --git a/main.go b/main.go index ed6c76a..bacd52c 100644 --- a/main.go +++ b/main.go @@ -12,23 +12,20 @@ import ( "net" "net/http" "strconv" - "time" "github.com/howeyc/fsnotify" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/exp" ) var ( listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") statsdListeningAddress = flag.String("statsdListeningAddress", ":9125", "The UDP address on which to receive statsd metric lines.") mappingConfig = flag.String("mappingConfig", "", "Metric mapping configuration file name.") - summaryFlushInterval = flag.Duration("summaryFlushInterval", 15*time.Minute, "How frequently to reset all summary metrics.") ) func serveHTTP() { - exp.Handle(prometheus.ExpositionResource, prometheus.DefaultHandler) - http.ListenAndServe(*listeningAddress, exp.DefaultCoarseMux) + http.Handle("/metrics", prometheus.Handler()) + http.ListenAndServe(*listeningAddress, nil) } func udpAddrFromString(addr string) *net.UDPAddr { @@ -75,10 +72,10 @@ func watchConfig(fileName string, mapper *metricMapper) { err = mapper.initFromFile(fileName) if err != nil { log.Println("Error reloading config:", err) - configLoads.Increment(map[string]string{"outcome": "failure"}) + configLoads.WithLabelValues("failure").Inc() } else { log.Println("Config reloaded successfully") - configLoads.Increment(map[string]string{"outcome": "success"}) + configLoads.WithLabelValues("success").Inc() } // Re-add the file watcher since it can get lost on some changes. E.g. // saving a file with vim results in a RENAME-MODIFY-DELETE event @@ -119,10 +116,5 @@ func main() { go watchConfig(*mappingConfig, mapper) } bridge := NewBridge(mapper) - go func() { - for _ = range time.Tick(*summaryFlushInterval) { - bridge.Summaries.Flush() - } - }() bridge.Listen(events) } diff --git a/mapper.go b/mapper.go index 12162b2..37e58c1 100644 --- a/mapper.go +++ b/mapper.go @@ -25,7 +25,7 @@ var ( type metricMapping struct { regex *regexp.Regexp - labels map[string]string + labels prometheus.Labels } type metricMapper struct { @@ -45,7 +45,7 @@ func (m *metricMapper) initFromString(fileContents string) error { state := SEARCHING parsedMappings := []metricMapping{} - currentMapping := metricMapping{labels: map[string]string{}} + currentMapping := metricMapping{labels: prometheus.Labels{}} for i, line := range lines { line := strings.TrimSpace(line) @@ -78,7 +78,7 @@ func (m *metricMapper) initFromString(fileContents string) error { parsedMappings = append(parsedMappings, currentMapping) state = SEARCHING - currentMapping = metricMapping{labels: map[string]string{}} + currentMapping = metricMapping{labels: prometheus.Labels{}} continue } @@ -100,7 +100,7 @@ func (m *metricMapper) initFromString(fileContents string) error { defer m.mutex.Unlock() m.mappings = parsedMappings - mappingsCount.Set(prometheus.NilLabels, float64(len(parsedMappings))) + mappingsCount.Set(float64(len(parsedMappings))) return nil } @@ -113,7 +113,7 @@ func (m *metricMapper) initFromFile(fileName string) error { return m.initFromString(string(mappingStr)) } -func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string, present bool) { +func (m *metricMapper) getMapping(statsdMetric string) (labels prometheus.Labels, present bool) { m.mutex.Lock() defer m.mutex.Unlock() @@ -123,7 +123,7 @@ func (m *metricMapper) getMapping(statsdMetric string) (labels map[string]string continue } - labels := map[string]string{} + labels := prometheus.Labels{} for label, valueExpr := range mapping.labels { value := mapping.regex.ExpandString([]byte{}, valueExpr, statsdMetric, matches) labels[label] = string(value) diff --git a/telemetry.go b/telemetry.go index 6c7bb3d..965d0f2 100644 --- a/telemetry.go +++ b/telemetry.go @@ -11,16 +11,36 @@ import ( ) var ( - eventStats = prometheus.NewCounter() - networkStats = prometheus.NewCounter() - configLoads = prometheus.NewCounter() - mappingsCount = prometheus.NewGauge() + eventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_bridge_events_count", + Help: "The total number of StatsD events seen.", + }, + []string{"type"}, + ) + networkStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_bridge_packets_count", + Help: "The total number of StatsD packets seen.", + }, + []string{"type"}, + ) + configLoads = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_bridge_config_reloads_count", + Help: "The number of configuration reloads.", + }, + []string{"outcome"}, + ) + mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "statsd_bridge_loaded_mappings_count", + Help: "The number of configured metric mappings.", + }) ) func init() { - prometheus.Register("statsd_bridge_events_count", "The total number of StatsD events seen.", prometheus.NilLabels, eventStats) - prometheus.Register("statsd_bridge_packets_count", "The total number of StatsD packets seen.", prometheus.NilLabels, networkStats) - prometheus.Register("statsd_bridge_config_reloads_count", "The number of configuration reloads.", prometheus.NilLabels, configLoads) - prometheus.Register("statsd_bridge_loaded_mappings_count", "The number of configured metric mappings.", prometheus.NilLabels, mappingsCount) - + prometheus.MustRegister(eventStats) + prometheus.MustRegister(networkStats) + prometheus.MustRegister(configLoads) + prometheus.MustRegister(mappingsCount) }