// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "bytes" "encoding/binary" "fmt" "hash/fnv" "net" "regexp" "strconv" "strings" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/log" ) const ( defaultHelp = "Metric autogenerated by statsd_bridge." regErrF = "A change of configuration created inconsistent metrics for " + "%q. You have to restart the statsd_bridge, and you should " + "consider the effects on your monitoring setup. Error: %s" ) var ( illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) hash = fnv.New64a() strBuf bytes.Buffer // Used for hashing. intBuf = make([]byte, 8) ) // hashNameAndLabels returns a hash value of the provided name string and all // the label names and values in the provided labels map. // // Not safe for concurrent use! (Uses a shared buffer and hasher to save on // allocations.) func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { hash.Reset() strBuf.Reset() strBuf.WriteString(name) hash.Write(strBuf.Bytes()) binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) hash.Write(intBuf) return hash.Sum64() } type CounterContainer struct { Elements map[uint64]prometheus.Counter } func NewCounterContainer() *CounterContainer { return &CounterContainer{ Elements: make(map[uint64]prometheus.Counter), } } func (c *CounterContainer) Get(metricName string, labels prometheus.Labels) prometheus.Counter { hash := hashNameAndLabels(metricName, labels) counter, ok := c.Elements[hash] if !ok { counter = prometheus.NewCounter(prometheus.CounterOpts{ Name: metricName, Help: defaultHelp, ConstLabels: labels, }) c.Elements[hash] = counter if err := prometheus.Register(counter); err != nil { log.Fatalf(regErrF, metricName, err) } } return counter } type GaugeContainer struct { Elements map[uint64]prometheus.Gauge } func NewGaugeContainer() *GaugeContainer { return &GaugeContainer{ Elements: make(map[uint64]prometheus.Gauge), } } func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels) prometheus.Gauge { hash := hashNameAndLabels(metricName, labels) gauge, ok := c.Elements[hash] if !ok { gauge = prometheus.NewGauge(prometheus.GaugeOpts{ Name: metricName, Help: defaultHelp, ConstLabels: labels, }) c.Elements[hash] = gauge if err := prometheus.Register(gauge); err != nil { log.Fatalf(regErrF, metricName, err) } } return gauge } type SummaryContainer struct { Elements map[uint64]prometheus.Summary } func NewSummaryContainer() *SummaryContainer { return &SummaryContainer{ Elements: make(map[uint64]prometheus.Summary), } } func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels) prometheus.Summary { hash := hashNameAndLabels(metricName, labels) summary, ok := c.Elements[hash] if !ok { summary = prometheus.NewSummary( prometheus.SummaryOpts{ Name: metricName, Help: defaultHelp, ConstLabels: labels, }) c.Elements[hash] = summary if err := prometheus.Register(summary); err != nil { log.Fatalf(regErrF, metricName, err) } } return summary } type Event interface { MetricName() string Value() float64 } type CounterEvent struct { metricName string value float64 } func (c *CounterEvent) MetricName() string { return c.metricName } func (c *CounterEvent) Value() float64 { return c.value } type GaugeEvent struct { metricName string value float64 } func (g *GaugeEvent) MetricName() string { return g.metricName } func (g *GaugeEvent) Value() float64 { return g.value } type TimerEvent struct { metricName string value float64 } func (t *TimerEvent) MetricName() string { return t.metricName } func (t *TimerEvent) Value() float64 { return t.value } type Events []Event type Bridge struct { Counters *CounterContainer Gauges *GaugeContainer Summaries *SummaryContainer mapper *metricMapper } func escapeMetricName(metricName string) string { // If a metric starts with a digit, prepend an underscore. if metricName[0] >= '0' && metricName[0] <= '9' { metricName = "_" + metricName } // Replace all illegal metric chars with underscores. metricName = illegalCharsRE.ReplaceAllString(metricName, "_") return metricName } func (b *Bridge) Listen(e <-chan Events) { for { events := <-e for _, event := range events { metricName := "" prometheusLabels := prometheus.Labels{} labels, present := b.mapper.getMapping(event.MetricName()) if present { metricName = labels["name"] for label, value := range labels { if label != "name" { prometheusLabels[label] = value } } } else { metricName = escapeMetricName(event.MetricName()) } switch event.(type) { case *CounterEvent: counter := b.Counters.Get( metricName+"_counter", prometheusLabels, ) counter.Add(event.Value()) eventStats.WithLabelValues("counter").Inc() case *GaugeEvent: gauge := b.Gauges.Get( metricName+"_gauge", prometheusLabels, ) gauge.Set(event.Value()) eventStats.WithLabelValues("gauge").Inc() case *TimerEvent: summary := b.Summaries.Get( metricName+"_timer", prometheusLabels, ) summary.Observe(event.Value()) eventStats.WithLabelValues("timer").Inc() default: log.Println("Unsupported event type") eventStats.WithLabelValues("illegal").Inc() } } } } func NewBridge(mapper *metricMapper) *Bridge { return &Bridge{ Counters: NewCounterContainer(), Gauges: NewGaugeContainer(), Summaries: NewSummaryContainer(), mapper: mapper, } } type StatsDListener struct { conn *net.UDPConn } func buildEvent(statType, metric string, value float64) (Event, error) { switch statType { case "c": return &CounterEvent{ metricName: metric, value: float64(value), }, nil case "g": return &GaugeEvent{ metricName: metric, value: float64(value), }, nil case "ms": return &TimerEvent{ metricName: metric, value: float64(value), }, nil case "s": return nil, fmt.Errorf("No support for StatsD sets") default: return nil, fmt.Errorf("Bad stat type %s", statType) } } func (l *StatsDListener) Listen(e chan<- Events) { // TODO: evaluate proper size according to MTU var buf [512]byte for { n, _, err := l.conn.ReadFromUDP(buf[0:]) if err != nil { log.Fatal(err) } l.handlePacket(buf[0:n], e) } } func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { lines := strings.Split(string(packet), "\n") events := Events{} for _, line := range lines { if line == "" { continue } elements := strings.Split(line, ":") if len(elements) < 2 { networkStats.WithLabelValues("malformed_line").Inc() log.Println("Bad line from StatsD:", line) continue } metric := elements[0] samples := elements[1:] for _, sample := range samples { components := strings.Split(sample, "|") samplingFactor := 1.0 if len(components) < 2 || len(components) > 3 { networkStats.WithLabelValues("malformed_component").Inc() log.Println("Bad component on line:", line) continue } valueStr, statType := components[0], components[1] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Printf("Bad value %s on line: %s", valueStr, line) networkStats.WithLabelValues("malformed_value").Inc() continue } if len(components) == 3 { if statType != "c" { log.Println("Illegal sampling factor for non-counter metric on line", line) networkStats.WithLabelValues("illegal_sample_factor").Inc() } samplingStr := components[2] if samplingStr[0] != '@' { log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) networkStats.WithLabelValues("invalid_sample_factor").Inc() continue } samplingFactor, err = strconv.ParseFloat(samplingStr[1:], 64) if err != nil { log.Printf("Invalid sampling factor %s on line %s", samplingStr, line) networkStats.WithLabelValues("invalid_sample_factor").Inc() continue } if samplingFactor == 0 { // This should never happen, but avoid division by zero if it does. log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line) samplingFactor = 1 } value /= samplingFactor } event, err := buildEvent(statType, metric, value) if err != nil { log.Printf("Error building event on line %s: %s", line, err) networkStats.WithLabelValues("illegal_event").Inc() continue } events = append(events, event) networkStats.WithLabelValues("legal").Inc() } } e <- events }