squash! Updated structs and tests.

squash! Updated structs and tests.

Updated structs and tests.

Signed-off-by: Frank Davidson <davidfr@americas.manulife.net>
Signed-off-by: Frank Davidson <ffdavidson@gmail.com>
Signed-off-by: Frank Davidson <frank_davidson@manulife.com>
This commit is contained in:
Frank Davidson 2020-04-08 14:59:19 -04:00
parent 4b3b9ba207
commit 5ec58a32c2
9 changed files with 594 additions and 126 deletions

View file

@ -14,12 +14,20 @@
package main package main
import ( import (
"fmt"
"net"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
) )
func TestHandlePacket(t *testing.T) { func TestHandlePacket(t *testing.T) {
@ -417,11 +425,35 @@ func TestHandlePacket(t *testing.T) {
}, },
} }
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{listener.StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
events := make(chan event.Events, 32) events := make(chan event.Events, 32)
l.SetEventHandler(&event.UnbufferedEventHandler{C: events}) l.SetEventHandler(&event.UnbufferedEventHandler{C: events})
for i, scenario := range scenarios { for i, scenario := range scenarios {
l.HandlePacket([]byte(scenario.in), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) l.HandlePacket([]byte(scenario.in))
le := len(events) le := len(events)
// Flatten actual events. // Flatten actual events.
@ -442,3 +474,225 @@ func TestHandlePacket(t *testing.T) {
} }
} }
} }
type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
}
type mockStatsDTCPListener struct {
listener.StatsDTCPListener
log.Logger
}
func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses.
lc, err := net.ListenTCP("tcp4", nil)
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err))
}
defer lc.Close()
go func() {
cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr))
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err))
}
defer cc.Close()
n, err := cc.Write(packet)
if err != nil || n != len(packet) {
panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n))
}
}()
sc, err := lc.AcceptTCP()
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
}
ml.HandleConn(sc)
}
// TestTtlExpiration validates expiration of time series.
// foobar metric without mapping should expire with default ttl of 1s
// bazqux metric should expire with ttl of 2s
func TestTtlExpiration(t *testing.T) {
// Mock a time.NewTicker
tickerCh := make(chan time.Time)
clock.ClockInstance = &clock.Clock{
TickerCh: tickerCh,
}
config := `
defaults:
ttl: 1s
mappings:
- match: bazqux.*
name: bazqux
ttl: 2s
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
ev := event.Events{
// event with default ttl = 1s
&event.GaugeEvent{
GMetricName: "foobar",
GValue: 200,
},
// event with ttl = 2s from a mapping
&event.TimerEvent{
TMetricName: "bazqux.main",
TValue: 42000,
},
}
var metrics []*dto.MetricFamily
var foobarValue *float64
var bazquxValue *float64
// Step 1. Send events with statsd metrics.
// Send empty Events to wait for events are handled.
// saveLabelValues will use fake instant as a lastRegisteredAt time.
clock.ClockInstance.Instant = time.Unix(0, 0)
events <- ev
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue == nil || bazquxValue == nil {
t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered")
}
if *foobarValue != 200 {
t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}
// Step 2. Increase Instant to emulate metrics expiration after 1s
clock.ClockInstance.Instant = time.Unix(1, 10)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should be expired")
}
if bazquxValue == nil {
t.Fatalf("Summary `bazqux` should be gathered")
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}
// Step 3. Increase Instant to emulate metrics expiration after 2s
clock.ClockInstance.Instant = time.Unix(2, 200)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if bazquxValue != nil {
t.Fatalf("Summary `bazqux` should be expired")
}
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should not be gathered after expiration")
}
}
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
// Method returns a value or nil if metric is not found.
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
var metricFamily *dto.MetricFamily
for _, m := range metrics {
if *m.Name == name {
metricFamily = m
break
}
}
if metricFamily == nil {
return nil
}
var metric *dto.Metric
labelStr := fmt.Sprintf("%v", labels)
for _, m := range metricFamily.Metric {
l := labelPairsAsLabels(m.GetLabel())
ls := fmt.Sprintf("%v", l)
if labelStr == ls {
metric = m
break
}
}
if metric == nil {
return nil
}
var value float64
if metric.Gauge != nil {
value = metric.Gauge.GetValue()
return &value
}
if metric.Counter != nil {
value = metric.Counter.GetValue()
return &value
}
if metric.Histogram != nil {
value = metric.Histogram.GetSampleSum()
return &value
}
if metric.Summary != nil {
value = metric.Summary.GetSampleSum()
return &value
}
if metric.Untyped != nil {
value = metric.Untyped.GetValue()
return &value
}
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
}
func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
labels = prometheus.Labels{}
for _, pair := range pairs {
if pair.Name == nil {
continue
}
value := ""
if pair.Value != nil {
value = *pair.Value
}
labels[*pair.Name] = value
}
return
}

View file

@ -50,7 +50,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
for i := 0; i < times; i++ { for i := 0; i < times; i++ {
for _, line := range bytesInput { for _, line := range bytesInput {
l.HandlePacket([]byte(line), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) l.HandlePacket([]byte(line))
} }
} }
} }
@ -142,7 +142,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err) b.Fatalf("Config load error: %s %s", config, err)
} }
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ec := make(chan event.Events, 1000) ec := make(chan event.Events, 1000)
go func() { go func() {
@ -152,6 +152,6 @@ mappings:
close(ec) close(ec)
}() }()
ex.Listen(ec, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(ec)
} }
} }

49
main.go
View file

@ -32,11 +32,11 @@ import (
"github.com/prometheus/common/version" "github.com/prometheus/common/version"
"gopkg.in/alecthomas/kingpin.v2" "gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/statsd_exporter/pkg/address"
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/util"
) )
const ( const (
@ -58,7 +58,8 @@ var (
Help: "Number of times events were flushed to exporter", Help: "Number of times events were flushed to exporter",
}, },
) )
eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{ eventsUnmapped = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total", Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.", Help: "The total number of StatsD events no mapping was found for.",
}) })
@ -294,7 +295,7 @@ func main() {
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)
if *statsdListenUDP != "" { if *statsdListenUDP != "" {
udpListenAddr, err := util.UDPAddrFromString(*statsdListenUDP) udpListenAddr, err := address.UDPAddrFromString(*statsdListenUDP)
if err != nil { if err != nil {
level.Error(logger).Log("msg", "invalid UDP listen address", "address", *statsdListenUDP, "error", err) level.Error(logger).Log("msg", "invalid UDP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1) os.Exit(1)
@ -313,12 +314,24 @@ func main() {
} }
} }
ul := &listener.StatsDUDPListener{Conn: uconn, EventHandler: eventQueue, Logger: logger} ul := &listener.StatsDUDPListener{
go ul.Listen(udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}
go ul.Listen()
} }
if *statsdListenTCP != "" { if *statsdListenTCP != "" {
tcpListenAddr, err := util.TCPAddrFromString(*statsdListenTCP) tcpListenAddr, err := address.TCPAddrFromString(*statsdListenTCP)
if err != nil { if err != nil {
level.Error(logger).Log("msg", "invalid TCP listen address", "address", *statsdListenUDP, "error", err) level.Error(logger).Log("msg", "invalid TCP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1) os.Exit(1)
@ -330,8 +343,22 @@ func main() {
} }
defer tconn.Close() defer tconn.Close()
tl := &listener.StatsDTCPListener{Conn: tconn, EventHandler: eventQueue, Logger: logger} tl := &listener.StatsDTCPListener{
go tl.Listen(linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, *sampleErrors, samplesReceived, tagErrors, tagsReceived) Conn: tconn,
EventHandler: eventQueue,
Logger: logger,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}
go tl.Listen()
} }
if *statsdListenUnixgram != "" { if *statsdListenUnixgram != "" {
@ -360,7 +387,7 @@ func main() {
} }
ul := &listener.StatsDUnixgramListener{Conn: uxgconn, EventHandler: eventQueue, Logger: logger} ul := &listener.StatsDUnixgramListener{Conn: uxgconn, EventHandler: eventQueue, Logger: logger}
go ul.Listen(unixgramPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) go ul.Listen()
// if it's an abstract unix domain socket, it won't exist on fs // if it's an abstract unix domain socket, it won't exist on fs
// so we can't chmod it either // so we can't chmod it either
@ -403,12 +430,12 @@ func main() {
go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
exporter := exporter.NewExporter(mapper, logger) exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM) signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
go exporter.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) go exporter.Listen(events)
<-signals <-signals
} }

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package util package address
import ( import (
"fmt" "fmt"

View file

@ -69,12 +69,14 @@ type EventQueue struct {
C chan Events C chan Events
q Events q Events
m sync.Mutex m sync.Mutex
flushThreshold int
flushTicker *time.Ticker flushTicker *time.Ticker
flushThreshold int
flushInterval time.Duration
eventsFlushed prometheus.Counter
} }
type EventHandler interface { type EventHandler interface {
Queue(event Events, eventsFlushed *prometheus.Counter) Queue(event Events)
} }
func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue { func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue {
@ -84,38 +86,39 @@ func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duratio
flushThreshold: flushThreshold, flushThreshold: flushThreshold,
flushTicker: ticker, flushTicker: ticker,
q: make([]Event, 0, flushThreshold), q: make([]Event, 0, flushThreshold),
eventsFlushed: eventsFlushed,
} }
go func() { go func() {
for { for {
<-ticker.C <-ticker.C
eq.Flush(eventsFlushed) eq.Flush()
} }
}() }()
return eq return eq
} }
func (eq *EventQueue) Queue(events Events, eventsFlushed *prometheus.Counter) { func (eq *EventQueue) Queue(events Events) {
eq.m.Lock() eq.m.Lock()
defer eq.m.Unlock() defer eq.m.Unlock()
for _, e := range events { for _, e := range events {
eq.q = append(eq.q, e) eq.q = append(eq.q, e)
if len(eq.q) >= eq.flushThreshold { if len(eq.q) >= eq.flushThreshold {
eq.FlushUnlocked(*eventsFlushed) eq.FlushUnlocked()
} }
} }
} }
func (eq *EventQueue) Flush(eventsFlushed prometheus.Counter) { func (eq *EventQueue) Flush() {
eq.m.Lock() eq.m.Lock()
defer eq.m.Unlock() defer eq.m.Unlock()
eq.FlushUnlocked(eventsFlushed) eq.FlushUnlocked()
} }
func (eq *EventQueue) FlushUnlocked(eventsFlushed prometheus.Counter) { func (eq *EventQueue) FlushUnlocked() {
eq.C <- eq.q eq.C <- eq.q
eq.q = make([]Event, 0, cap(eq.q)) eq.q = make([]Event, 0, cap(eq.q))
eventsFlushed.Inc() eq.eventsFlushed.Inc()
} }
func (eq *EventQueue) Len() int { func (eq *EventQueue) Len() int {
@ -129,6 +132,6 @@ type UnbufferedEventHandler struct {
C chan Events C chan Events
} }
func (ueh *UnbufferedEventHandler) Queue(events Events, eventsFlushed *prometheus.Counter) { func (ueh *UnbufferedEventHandler) Queue(events Events) {
ueh.C <- events ueh.C <- events
} }

View file

@ -11,23 +11,30 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package main package event
import ( import (
"testing" "testing"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event" )
var eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
) )
func TestEventThresholdFlush(t *testing.T) { func TestEventThresholdFlush(t *testing.T) {
c := make(chan event.Events, 100) c := make(chan Events, 100)
// We're not going to flush during this test, so the duration doesn't matter. // We're not going to flush during this test, so the duration doesn't matter.
eq := event.NewEventQueue(c, 5, time.Second, eventsFlushed) eq := NewEventQueue(c, 5, time.Second, eventsFlushed)
e := make(event.Events, 13) e := make(Events, 13)
go func() { go func() {
eq.Queue(e, &eventsFlushed) eq.Queue(e)
}() }()
batch := <-c batch := <-c
@ -52,10 +59,10 @@ func TestEventIntervalFlush(t *testing.T) {
} }
clock.ClockInstance.Instant = time.Unix(0, 0) clock.ClockInstance.Instant = time.Unix(0, 0)
c := make(chan event.Events, 100) c := make(chan Events, 100)
eq := event.NewEventQueue(c, 1000, time.Second*1000, eventsFlushed) eq := NewEventQueue(c, 1000, time.Second*1000, eventsFlushed)
e := make(event.Events, 10) e := make(Events, 10)
eq.Queue(e, &eventsFlushed) eq.Queue(e)
if eq.Len() != 10 { if eq.Len() != 10 {
t.Fatal("Expected 10 events to be queued, but got", eq.Len()) t.Fatal("Expected 10 events to be queued, but got", eq.Len())

View file

@ -35,12 +35,17 @@ type Exporter struct {
Mapper *mapper.MetricMapper Mapper *mapper.MetricMapper
Registry *registry.Registry Registry *registry.Registry
Logger log.Logger Logger log.Logger
EventsActions *prometheus.CounterVec
EventsUnmapped prometheus.Counter
ErrorEventStats *prometheus.CounterVec
EventStats *prometheus.CounterVec
ConflictingEventStats *prometheus.CounterVec
MetricsCount *prometheus.GaugeVec
} }
// Listen handles all events sent to the given channel sequentially. It // Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed. // terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan event.Events, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, func (b *Exporter) Listen(e <-chan event.Events) {
errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) {
removeStaleMetricsTicker := clock.NewTicker(time.Second) removeStaleMetricsTicker := clock.NewTicker(time.Second)
@ -55,15 +60,14 @@ func (b *Exporter) Listen(e <-chan event.Events, eventsActions *prometheus.Count
return return
} }
for _, event := range events { for _, event := range events {
b.handleEvent(event, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) b.handleEvent(event)
} }
} }
} }
} }
// handleEvent processes a single Event according to the configured mapping. // handleEvent processes a single Event according to the configured mapping.
func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, func (b *Exporter) handleEvent(thisEvent event.Event) {
errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) {
mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType()) mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType())
if mapping == nil { if mapping == nil {
@ -74,7 +78,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
} }
if mapping.Action == mapper.ActionTypeDrop { if mapping.Action == mapper.ActionTypeDrop {
eventsActions.WithLabelValues("drop").Inc() b.EventsActions.WithLabelValues("drop").Inc()
return return
} }
@ -89,16 +93,16 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
if present { if present {
if mapping.Name == "" { if mapping.Name == "" {
level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match) level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match)
errorEventStats.WithLabelValues("empty_metric_name").Inc() b.ErrorEventStats.WithLabelValues("empty_metric_name").Inc()
return return
} }
metricName = mapper.EscapeMetricName(mapping.Name) metricName = mapper.EscapeMetricName(mapping.Name)
for label, value := range labels { for label, value := range labels {
prometheusLabels[label] = value prometheusLabels[label] = value
} }
eventsActions.WithLabelValues(string(mapping.Action)).Inc() b.EventsActions.WithLabelValues(string(mapping.Action)).Inc()
} else { } else {
eventsUnmapped.Inc() b.EventsUnmapped.Inc()
metricName = mapper.EscapeMetricName(thisEvent.MetricName()) metricName = mapper.EscapeMetricName(thisEvent.MetricName())
} }
@ -108,21 +112,21 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
// will cause the exporter to panic. Instead we will warn and continue to the next event. // will cause the exporter to panic. Instead we will warn and continue to the next event.
if thisEvent.Value() < 0.0 { if thisEvent.Value() < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value())
errorEventStats.WithLabelValues("illegal_negative_counter").Inc() b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return return
} }
counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, metricsCount) counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
counter.Add(thisEvent.Value()) counter.Add(thisEvent.Value())
eventStats.WithLabelValues("counter").Inc() b.EventStats.WithLabelValues("counter").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("counter").Inc() b.ConflictingEventStats.WithLabelValues("counter").Inc()
} }
case *event.GaugeEvent: case *event.GaugeEvent:
gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, metricsCount) gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
if ev.GRelative { if ev.GRelative {
@ -130,10 +134,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
} else { } else {
gauge.Set(thisEvent.Value()) gauge.Set(thisEvent.Value())
} }
eventStats.WithLabelValues("gauge").Inc() b.EventStats.WithLabelValues("gauge").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("gauge").Inc() b.ConflictingEventStats.WithLabelValues("gauge").Inc()
} }
case *event.TimerEvent: case *event.TimerEvent:
@ -147,23 +151,23 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
switch t { switch t {
case mapper.TimerTypeHistogram: case mapper.TimerTypeHistogram:
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, metricsCount) histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc() b.EventStats.WithLabelValues("timer").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc() b.ConflictingEventStats.WithLabelValues("timer").Inc()
} }
case mapper.TimerTypeDefault, mapper.TimerTypeSummary: case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, metricsCount) summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil { if err == nil {
summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc() b.EventStats.WithLabelValues("timer").Inc()
} else { } else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc() b.ConflictingEventStats.WithLabelValues("timer").Inc()
} }
default: default:
@ -173,14 +177,20 @@ func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions *prometheus.
default: default:
level.Debug(b.Logger).Log("msg", "Unsupported event type") level.Debug(b.Logger).Log("msg", "Unsupported event type")
eventStats.WithLabelValues("illegal").Inc() b.EventStats.WithLabelValues("illegal").Inc()
} }
} }
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter { func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{ return &Exporter{
Mapper: mapper, Mapper: mapper,
Registry: registry.NewRegistry(mapper), Registry: registry.NewRegistry(mapper),
Logger: logger, Logger: logger,
EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped,
ErrorEventStats: errorEventStats,
EventStats: eventStats,
ConflictingEventStats: conflictingEventStats,
MetricsCount: metricsCount,
} }
} }

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package main package exporter
import ( import (
"fmt" "fmt"
@ -25,13 +25,133 @@ import (
"github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/registry" "github.com/prometheus/statsd_exporter/pkg/registry"
) )
var (
eventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_total",
Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.",
})
udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsing DogStatsD tags.",
},
)
configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_config_reloads_total",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "statsd_exporter_loaded_mappings",
Help: "The current number of configured metric mappings.",
})
conflictingEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_conflict_total",
Help: "The total number of StatsD events with conflicting names.",
},
[]string{"type"},
)
errorEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_error_total",
Help: "The total number of StatsD events discarded due to errors.",
},
[]string{"reason"},
)
eventsActions = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_actions_total",
Help: "The total number of StatsD events by action.",
},
[]string{"action"},
)
metricsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "statsd_exporter_metrics_total",
Help: "The total number of metrics.",
},
[]string{"type"},
)
)
// TestNegativeCounter validates when we send a negative // TestNegativeCounter validates when we send a negative
// number to a counter that we no longer panic the Exporter Listener. // number to a counter that we no longer panic the Exporter Listener.
func TestNegativeCounter(t *testing.T) { func TestNegativeCounter(t *testing.T) {
@ -64,8 +184,8 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter) updated := getTelemetryCounterValue(errorCounter)
if updated-prev != 1 { if updated-prev != 1 {
@ -145,8 +265,8 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err) t.Fatalf("Config load error: %s %s", config, err)
} }
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil { if err != nil {
@ -203,8 +323,8 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err) t.Fatalf("Config load error: %s %s", config, err)
} }
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil { if err != nil {
@ -418,8 +538,8 @@ mappings:
events <- s.in events <- s.in
close(events) close(events)
}() }()
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather() metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil { if err != nil {
@ -473,8 +593,8 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name") errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter) prev := getTelemetryCounterValue(errorCounter)
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter) updated := getTelemetryCounterValue(errorCounter)
if updated-prev != 1 { if updated-prev != 1 {
@ -498,9 +618,33 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
ueh := &event.UnbufferedEventHandler{C: events} ueh := &event.UnbufferedEventHandler{C: events}
go func() { go func() {
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{listener.StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
l.SetEventHandler(ueh) l.SetEventHandler(ueh)
l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived) l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
} }
close(events) close(events)
}() }()
@ -508,8 +652,8 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
} }
// In the case of someone starting the statsd exporter with no mapping file specified // In the case of someone starting the statsd exporter with no mapping file specified
@ -522,8 +666,8 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
}() }()
name := "default_foo" name := "default_foo"
@ -566,9 +710,9 @@ func TestHistogramUnits(t *testing.T) {
go func() { go func() {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.TimerType = mapper.TimerTypeHistogram ex.Mapper.Defaults.TimerType = mapper.TimerTypeHistogram
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
}() }()
// Synchronously send a statsd event to wait for handleEvent execution. // Synchronously send a statsd event to wait for handleEvent execution.
@ -605,8 +749,8 @@ func TestCounterIncrement(t *testing.T) {
go func() { go func() {
testMapper := mapper.MetricMapper{} testMapper := mapper.MetricMapper{}
testMapper.InitCache(0) testMapper.InitCache(0)
ex := exporter.NewExporter(&testMapper, log.NewNopLogger()) ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
}() }()
// Synchronously send a statsd event to wait for handleEvent execution. // Synchronously send a statsd event to wait for handleEvent execution.
@ -647,7 +791,7 @@ func TestCounterIncrement(t *testing.T) {
} }
type statsDPacketHandler interface { type statsDPacketHandler interface {
HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler) SetEventHandler(eh event.EventHandler)
} }
@ -656,7 +800,7 @@ type mockStatsDTCPListener struct {
log.Logger log.Logger
} }
func (ml *mockStatsDTCPListener) HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6 // Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses. // addresses.
lc, err := net.ListenTCP("tcp4", nil) lc, err := net.ListenTCP("tcp4", nil)
@ -684,7 +828,7 @@ func (ml *mockStatsDTCPListener) HandlePacket(packet []byte, udpPackets promethe
if err != nil { if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
} }
ml.HandleConn(sc, linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, sampleErrors, samplesReceived, tagErrors, tagsReceived) ml.HandleConn(sc)
} }
// TestTtlExpiration validates expiration of time series. // TestTtlExpiration validates expiration of time series.
@ -714,8 +858,8 @@ mappings:
events := make(chan event.Events) events := make(chan event.Events)
defer close(events) defer close(events)
go func() { go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger()) ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events)
}() }()
ev := event.Events{ ev := event.Events{

View file

@ -31,13 +31,20 @@ type StatsDUDPListener struct {
Conn *net.UDPConn Conn *net.UDPConn
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
UDPPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
} }
func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh l.EventHandler = eh
} }
func (l *StatsDUDPListener) Listen(udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535) buf := make([]byte, 65535)
for { for {
n, _, err := l.Conn.ReadFromUDP(buf) n, _, err := l.Conn.ReadFromUDP(buf)
@ -50,16 +57,16 @@ func (l *StatsDUDPListener) Listen(udpPackets prometheus.Counter, linesReceived
level.Error(l.Logger).Log("error", err) level.Error(l.Logger).Log("error", err)
return return
} }
l.HandlePacket(buf[0:n], udpPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) l.HandlePacket(buf[0:n])
} }
} }
func (l *StatsDUDPListener) HandlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDUDPListener) HandlePacket(packet []byte) {
udpPackets.Inc() l.UDPPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
linesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }
@ -67,13 +74,22 @@ type StatsDTCPListener struct {
Conn *net.TCPListener Conn *net.TCPListener
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
TCPConnections prometheus.Counter
TCPErrors prometheus.Counter
TCPLineTooLong prometheus.Counter
} }
func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh l.EventHandler = eh
} }
func (l *StatsDTCPListener) Listen(linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDTCPListener) Listen() {
for { for {
c, err := l.Conn.AcceptTCP() c, err := l.Conn.AcceptTCP()
if err != nil { if err != nil {
@ -85,32 +101,32 @@ func (l *StatsDTCPListener) Listen(linesReceived prometheus.Counter, eventsFlush
level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err) level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err)
os.Exit(1) os.Exit(1)
} }
go l.HandleConn(c, linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, sampleErrors, samplesReceived, tagErrors, tagsReceived) go l.HandleConn(c)
} }
} }
func (l *StatsDTCPListener) HandleConn(c *net.TCPConn, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
defer c.Close() defer c.Close()
tcpConnections.Inc() l.TCPConnections.Inc()
r := bufio.NewReader(c) r := bufio.NewReader(c)
for { for {
line, isPrefix, err := r.ReadLine() line, isPrefix, err := r.ReadLine()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
tcpErrors.Inc() l.TCPErrors.Inc()
level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err) level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err)
} }
break break
} }
if isPrefix { if isPrefix {
tcpLineTooLong.Inc() l.TCPLineTooLong.Inc()
level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
break break
} }
linesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(string(line), sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }
@ -118,13 +134,20 @@ type StatsDUnixgramListener struct {
Conn *net.UnixConn Conn *net.UnixConn
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
UnixgramPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
} }
func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh l.EventHandler = eh
} }
func (l *StatsDUnixgramListener) Listen(unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDUnixgramListener) Listen() {
buf := make([]byte, 65535) buf := make([]byte, 65535)
for { for {
n, _, err := l.Conn.ReadFromUnix(buf) n, _, err := l.Conn.ReadFromUnix(buf)
@ -137,15 +160,15 @@ func (l *StatsDUnixgramListener) Listen(unixgramPackets prometheus.Counter, line
level.Error(l.Logger).Log(err) level.Error(l.Logger).Log(err)
os.Exit(1) os.Exit(1)
} }
l.HandlePacket(buf[:n], unixgramPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) l.HandlePacket(buf[:n])
} }
} }
func (l *StatsDUnixgramListener) HandlePacket(packet []byte, unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
unixgramPackets.Inc() l.UnixgramPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
linesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }