diff --git a/main.go b/main.go index 72bfd8d..bfd82c1 100644 --- a/main.go +++ b/main.go @@ -8,24 +8,25 @@ package main import ( "flag" + "fmt" "log" "net" "net/http" "strconv" "strings" - "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/exp" ) -var listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") -var statsdListeningAddress = flag.String("statsdListeningAddress", ":8126", "The UDP address on which to receive statsd metric lines.") +var ( + listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") + statsdListeningAddress = flag.String("statsdListeningAddress", ":8126", "The UDP address on which to receive statsd metric lines.") + summaryFlushInterval = flag.Duration("summaryFlushInterval", time.Hour, "How frequently to reset all summary metrics.") +) type CounterContainer struct { - sync.RWMutex - Elements map[string]prometheus.Counter } @@ -36,9 +37,6 @@ func NewCounterContainer() *CounterContainer { } func (c *CounterContainer) Get(metricName string) prometheus.Counter { - c.Lock() - defer c.Unlock() - counter, ok := c.Elements[metricName] if !ok { counter = prometheus.NewCounter() @@ -49,8 +47,6 @@ func (c *CounterContainer) Get(metricName string) prometheus.Counter { } type GaugeContainer struct { - sync.RWMutex - Elements map[string]prometheus.Gauge } @@ -61,9 +57,6 @@ func NewGaugeContainer() *GaugeContainer { } func (c *GaugeContainer) Get(metricName string) prometheus.Gauge { - c.Lock() - defer c.Unlock() - gauge, ok := c.Elements[metricName] if !ok { gauge = prometheus.NewGauge() @@ -74,11 +67,7 @@ func (c *GaugeContainer) Get(metricName string) prometheus.Gauge { } type SummaryContainer struct { - sync.RWMutex - Elements map[string]prometheus.Histogram - - ResetInterval time.Duration } func NewSummaryContainer() *SummaryContainer { @@ -88,9 +77,6 @@ func NewSummaryContainer() *SummaryContainer { } func (c *SummaryContainer) Get(metricName string) prometheus.Histogram { - c.Lock() - defer c.Unlock() - summary, ok := c.Elements[metricName] if !ok { summary = prometheus.NewDefaultHistogram() @@ -100,6 +86,12 @@ func (c *SummaryContainer) Get(metricName string) prometheus.Histogram { return summary } +func (c *SummaryContainer) Flush() { + for _, summary := range c.Elements { + summary.ResetAll() + } +} + type Event interface { MetricName() string Value() float64 @@ -179,6 +171,30 @@ type StatsDListener struct { conn *net.UDPConn } +func buildEvent(statType, metric string, value float64) (Event, error) { + switch statType { + case "c": + return &CounterEvent{ + metricName: metric, + value: float64(value), + }, nil + case "g": + return &GaugeEvent{ + metricName: metric, + value: float64(value), + }, nil + case "ms": + return &TimerEvent{ + metricName: metric, + value: float64(value), + }, nil + case "s": + return nil, fmt.Errorf("No support for StatsD sets") + default: + return nil, fmt.Errorf("Bad stat type %s", statType) + } +} + func (l *StatsDListener) Listen(e chan<- Events) { // TODO: evaluate proper size according to MTU var buf [512]byte @@ -214,7 +230,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { continue } valueStr, statType := components[0], components[1] - value, err := strconv.Atoi(valueStr) + value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Printf("Bad value %s on line: %s", valueStr, line) continue @@ -239,33 +255,15 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line) samplingFactor = 1 } + value /= samplingFactor } - var event Event - switch statType { - case "c": - event = &CounterEvent{ - metricName: metric, - value: float64(value) / samplingFactor, - } - case "g": - event = &GaugeEvent{ - metricName: metric, - value: float64(value), - } - case "ms": - event = &TimerEvent{ - metricName: metric, - value: float64(value), - } - case "s": - log.Println("No support for StatsD sets in line", line) - default: - log.Printf("Bad stat type %s on line: %s", statType, line) - } - if event != nil { - events = append(events, event) + event, err := buildEvent(statType, metric, value) + if err != nil { + log.Printf("Error building event on line %s: %s", line, err) + continue } + events = append(events, event) } } e <- events @@ -319,5 +317,10 @@ func main() { go l.Listen(events) bridge := NewBridge() + go func() { + for _ = range time.Tick(*summaryFlushInterval) { + bridge.Summaries.Flush() + } + }() bridge.Listen(events) }