Address review comments.

This commit is contained in:
Julius Volz 2013-07-02 20:10:01 +02:00
parent ff26260553
commit ef07f34fe7

91
main.go
View file

@ -8,24 +8,25 @@ package main
import ( import (
"flag" "flag"
"fmt"
"log" "log"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/exp" "github.com/prometheus/client_golang/prometheus/exp"
) )
var listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.") var (
var statsdListeningAddress = flag.String("statsdListeningAddress", ":8126", "The UDP address on which to receive statsd metric lines.") listeningAddress = flag.String("listeningAddress", ":8080", "The address on which to expose generated Prometheus metrics.")
statsdListeningAddress = flag.String("statsdListeningAddress", ":8126", "The UDP address on which to receive statsd metric lines.")
summaryFlushInterval = flag.Duration("summaryFlushInterval", time.Hour, "How frequently to reset all summary metrics.")
)
type CounterContainer struct { type CounterContainer struct {
sync.RWMutex
Elements map[string]prometheus.Counter Elements map[string]prometheus.Counter
} }
@ -36,9 +37,6 @@ func NewCounterContainer() *CounterContainer {
} }
func (c *CounterContainer) Get(metricName string) prometheus.Counter { func (c *CounterContainer) Get(metricName string) prometheus.Counter {
c.Lock()
defer c.Unlock()
counter, ok := c.Elements[metricName] counter, ok := c.Elements[metricName]
if !ok { if !ok {
counter = prometheus.NewCounter() counter = prometheus.NewCounter()
@ -49,8 +47,6 @@ func (c *CounterContainer) Get(metricName string) prometheus.Counter {
} }
type GaugeContainer struct { type GaugeContainer struct {
sync.RWMutex
Elements map[string]prometheus.Gauge Elements map[string]prometheus.Gauge
} }
@ -61,9 +57,6 @@ func NewGaugeContainer() *GaugeContainer {
} }
func (c *GaugeContainer) Get(metricName string) prometheus.Gauge { func (c *GaugeContainer) Get(metricName string) prometheus.Gauge {
c.Lock()
defer c.Unlock()
gauge, ok := c.Elements[metricName] gauge, ok := c.Elements[metricName]
if !ok { if !ok {
gauge = prometheus.NewGauge() gauge = prometheus.NewGauge()
@ -74,11 +67,7 @@ func (c *GaugeContainer) Get(metricName string) prometheus.Gauge {
} }
type SummaryContainer struct { type SummaryContainer struct {
sync.RWMutex
Elements map[string]prometheus.Histogram Elements map[string]prometheus.Histogram
ResetInterval time.Duration
} }
func NewSummaryContainer() *SummaryContainer { func NewSummaryContainer() *SummaryContainer {
@ -88,9 +77,6 @@ func NewSummaryContainer() *SummaryContainer {
} }
func (c *SummaryContainer) Get(metricName string) prometheus.Histogram { func (c *SummaryContainer) Get(metricName string) prometheus.Histogram {
c.Lock()
defer c.Unlock()
summary, ok := c.Elements[metricName] summary, ok := c.Elements[metricName]
if !ok { if !ok {
summary = prometheus.NewDefaultHistogram() summary = prometheus.NewDefaultHistogram()
@ -100,6 +86,12 @@ func (c *SummaryContainer) Get(metricName string) prometheus.Histogram {
return summary return summary
} }
func (c *SummaryContainer) Flush() {
for _, summary := range c.Elements {
summary.ResetAll()
}
}
type Event interface { type Event interface {
MetricName() string MetricName() string
Value() float64 Value() float64
@ -179,6 +171,30 @@ type StatsDListener struct {
conn *net.UDPConn conn *net.UDPConn
} }
func buildEvent(statType, metric string, value float64) (Event, error) {
switch statType {
case "c":
return &CounterEvent{
metricName: metric,
value: float64(value),
}, nil
case "g":
return &GaugeEvent{
metricName: metric,
value: float64(value),
}, nil
case "ms":
return &TimerEvent{
metricName: metric,
value: float64(value),
}, nil
case "s":
return nil, fmt.Errorf("No support for StatsD sets")
default:
return nil, fmt.Errorf("Bad stat type %s", statType)
}
}
func (l *StatsDListener) Listen(e chan<- Events) { func (l *StatsDListener) Listen(e chan<- Events) {
// TODO: evaluate proper size according to MTU // TODO: evaluate proper size according to MTU
var buf [512]byte var buf [512]byte
@ -214,7 +230,7 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
continue continue
} }
valueStr, statType := components[0], components[1] valueStr, statType := components[0], components[1]
value, err := strconv.Atoi(valueStr) value, err := strconv.ParseFloat(valueStr, 64)
if err != nil { if err != nil {
log.Printf("Bad value %s on line: %s", valueStr, line) log.Printf("Bad value %s on line: %s", valueStr, line)
continue continue
@ -239,35 +255,17 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line) log.Printf("Invalid zero sampling factor %s on line %s, setting to 1", samplingStr, line)
samplingFactor = 1 samplingFactor = 1
} }
value /= samplingFactor
} }
var event Event event, err := buildEvent(statType, metric, value)
switch statType { if err != nil {
case "c": log.Printf("Error building event on line %s: %s", line, err)
event = &CounterEvent{ continue
metricName: metric,
value: float64(value) / samplingFactor,
} }
case "g":
event = &GaugeEvent{
metricName: metric,
value: float64(value),
}
case "ms":
event = &TimerEvent{
metricName: metric,
value: float64(value),
}
case "s":
log.Println("No support for StatsD sets in line", line)
default:
log.Printf("Bad stat type %s on line: %s", statType, line)
}
if event != nil {
events = append(events, event) events = append(events, event)
} }
} }
}
e <- events e <- events
} }
@ -319,5 +317,10 @@ func main() {
go l.Listen(events) go l.Listen(events)
bridge := NewBridge() bridge := NewBridge()
go func() {
for _ = range time.Tick(*summaryFlushInterval) {
bridge.Summaries.Flush()
}
}()
bridge.Listen(events) bridge.Listen(events)
} }