move to line parser struct and option functions

Signed-off-by: glightfoot <glightfoot@rsglab.com>
This commit is contained in:
glightfoot 2020-08-10 12:44:23 -04:00
parent db25b1d658
commit 6942b5a4f3
8 changed files with 139 additions and 56 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
) )
@ -530,10 +531,17 @@ func TestHandlePacket(t *testing.T) {
}, },
} }
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil, Conn: nil,
EventHandler: nil, EventHandler: nil,
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
@ -545,6 +553,7 @@ func TestHandlePacket(t *testing.T) {
Conn: nil, Conn: nil,
EventHandler: nil, EventHandler: nil,
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors, SampleErrors: *sampleErrors,

View file

@ -21,6 +21,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter" "github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
) )
@ -47,6 +48,12 @@ func benchmarkUDPListener(times int, b *testing.B) {
} }
} }
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
// reset benchmark timer to not measure startup costs // reset benchmark timer to not measure startup costs
b.ResetTimer() b.ResetTimer()
@ -60,6 +67,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
l := listener.StatsDUDPListener{ l := listener.StatsDUDPListener{
EventHandler: &event.UnbufferedEventHandler{C: events}, EventHandler: &event.UnbufferedEventHandler{C: events},
Logger: logger, Logger: logger,
LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
LinesReceived: linesReceived, LinesReceived: linesReceived,
SamplesReceived: samplesReceived, SamplesReceived: samplesReceived,

View file

@ -43,10 +43,19 @@ func benchmarkLinesToEvents(times int, b *testing.B, input []string) {
// always report allocations since this is a hot path // always report allocations since this is a hot path
b.ReportAllocs() b.ReportAllocs()
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
// reset benchmark timer to not measure startup costs
b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
for i := 0; i < times; i++ { for i := 0; i < times; i++ {
for _, l := range input { for _, l := range input {
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
} }
} }
} }
@ -75,6 +84,12 @@ func BenchmarkLineFormats(b *testing.B) {
"invalidInfluxDb": "foo3,tag1=bar,tag2:100|c", "invalidInfluxDb": "foo3,tag1=bar,tag2:100|c",
} }
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
// reset benchmark timer to not measure startup costs // reset benchmark timer to not measure startup costs
b.ResetTimer() b.ResetTimer()
@ -83,7 +98,7 @@ func BenchmarkLineFormats(b *testing.B) {
// always report allocations since this is a hot path // always report allocations since this is a hot path
b.ReportAllocs() b.ReportAllocs()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
} }
}) })
} }

21
main.go
View file

@ -282,11 +282,19 @@ func main() {
kingpin.Parse() kingpin.Parse()
logger := promlog.New(promlogConfig) logger := promlog.New(promlogConfig)
// Set line parsing options parser := line.NewParser()
line.DogstatsdTagsEnabled = *dogstatsdTagsEnabled if *dogstatsdTagsEnabled {
line.InfluxdbTagsEnabled = *influxdbTagsEnabled parser.EnableDogstatsdParsing()
line.LibratoTagsEnabled = *libratoTagsEnabled }
line.SignalFXTagsEnabled = *signalFXTagsEnabled if *influxdbTagsEnabled {
parser.EnableInfluxdbParsing()
}
if *libratoTagsEnabled {
parser.EnableLibratoParsing()
}
if *signalFXTagsEnabled {
parser.EnableSignalFXParsing()
}
cacheOption := mapper.WithCacheType(*cacheType) cacheOption := mapper.WithCacheType(*cacheType)
@ -330,6 +338,7 @@ func main() {
Conn: uconn, Conn: uconn,
EventHandler: eventQueue, EventHandler: eventQueue,
Logger: logger, Logger: logger,
LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
@ -359,6 +368,7 @@ func main() {
Conn: tconn, Conn: tconn,
EventHandler: eventQueue, EventHandler: eventQueue,
Logger: logger, Logger: logger,
LineParser: parser,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors, SampleErrors: *sampleErrors,
@ -402,6 +412,7 @@ func main() {
Conn: uxgconn, Conn: uxgconn,
EventHandler: eventQueue, EventHandler: eventQueue,
Logger: logger, Logger: logger,
LineParser: parser,
UnixgramPackets: unixgramPackets, UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,

View file

@ -617,11 +617,18 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
events := make(chan event.Events) events := make(chan event.Events)
ueh := &event.UnbufferedEventHandler{C: events} ueh := &event.UnbufferedEventHandler{C: events}
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
go func() { go func() {
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil, Conn: nil,
EventHandler: nil, EventHandler: nil,
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
@ -633,6 +640,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
Conn: nil, Conn: nil,
EventHandler: nil, EventHandler: nil,
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors, SampleErrors: *sampleErrors,
@ -1059,11 +1067,16 @@ func BenchmarkParseDogStatsDTags(b *testing.B) {
"a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5", "a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5",
} }
parser := line.NewParser()
parser.EnableDogstatsdParsing()
b.ResetTimer()
for name, tags := range scenarios { for name, tags := range scenarios {
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
labels := map[string]string{} labels := map[string]string{}
line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger()) parser.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
} }
}) })
} }

View file

@ -27,13 +27,39 @@ import (
"github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mapper"
) )
// These globals can be used to control parsing behavior // Parser is a struct to hold configuration for parsing behavior
var ( type Parser struct {
DogstatsdTagsEnabled = true DogstatsdTagsEnabled bool
InfluxdbTagsEnabled = true InfluxdbTagsEnabled bool
LibratoTagsEnabled = true LibratoTagsEnabled bool
SignalFXTagsEnabled = true SignalFXTagsEnabled bool
) }
// NewParser returns a new line parser
func NewParser() *Parser {
p := Parser{}
return &p
}
// EnableDogstatsdParsing option to enable dogstatsd tag parsing
func (p *Parser) EnableDogstatsdParsing() {
p.DogstatsdTagsEnabled = true
}
// EnableInfluxdbParsing option to enable influxdb tag parsing
func (p *Parser) EnableInfluxdbParsing() {
p.InfluxdbTagsEnabled = true
}
// EnableLibratoParsing option to enable librato tag parsing
func (p *Parser) EnableLibratoParsing() {
p.LibratoTagsEnabled = true
}
// EnableSignalFXParsing option to enable signalfx tag parsing
func (p *Parser) EnableSignalFXParsing() {
p.SignalFXTagsEnabled = true
}
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) { func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) {
switch statType { switch statType {
@ -122,8 +148,8 @@ func trimLeftHash(s string) string {
return s return s
} }
func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { func (p *Parser) ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
if DogstatsdTagsEnabled { if p.DogstatsdTagsEnabled {
lastTagEndIndex := 0 lastTagEndIndex := 0
for i, c := range component { for i, c := range component {
if c == ',' { if c == ',' {
@ -141,8 +167,8 @@ func ParseDogStatsDTags(component string, labels map[string]string, tagErrors pr
} }
} }
func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string { func (p *Parser) parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
if SignalFXTagsEnabled { if p.SignalFXTagsEnabled {
// check for SignalFx tags first // check for SignalFx tags first
// `[` delimits start of tags by SignalFx // `[` delimits start of tags by SignalFx
// `]` delimits end of tags by SignalFx // `]` delimits end of tags by SignalFx
@ -168,7 +194,7 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags // https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
// `,` delimits start of tags by InfluxDB // `,` delimits start of tags by InfluxDB
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
if (c == '#' && LibratoTagsEnabled) || (c == ',' && InfluxdbTagsEnabled) { if (c == '#' && p.LibratoTagsEnabled) || (c == ',' && p.InfluxdbTagsEnabled) {
parseNameTags(name[i+1:], labels, tagErrors, logger) parseNameTags(name[i+1:], labels, tagErrors, logger)
return name[:i] return name[:i]
} }
@ -176,7 +202,7 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu
return name return name
} }
func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events { func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
events := event.Events{} events := event.Events{}
if line == "" { if line == "" {
return events return events
@ -190,7 +216,7 @@ func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceiv
} }
labels := map[string]string{} labels := map[string]string{}
metric := parseNameAndTags(elements[0], labels, tagErrors, logger) metric := p.parseNameAndTags(elements[0], labels, tagErrors, logger)
var samples []string var samples []string
if strings.Contains(elements[1], "|#") { if strings.Contains(elements[1], "|#") {
@ -264,7 +290,7 @@ samples:
multiplyEvents = int(1 / samplingFactor) multiplyEvents = int(1 / samplingFactor)
} }
case '#': case '#':
ParseDogStatsDTags(component[1:], labels, tagErrors, logger) p.ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
default: default:
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line) level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc() sampleErrors.WithLabelValues("invalid_sample_factor").Inc()

View file

@ -491,14 +491,15 @@ func TestLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = true parser := NewParser()
InfluxdbTagsEnabled = true parser.EnableDogstatsdParsing()
SignalFXTagsEnabled = true parser.EnableInfluxdbParsing()
LibratoTagsEnabled = true parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {
@ -730,14 +731,11 @@ func TestDisableParsingLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = false parser := NewParser()
InfluxdbTagsEnabled = false
SignalFXTagsEnabled = false
LibratoTagsEnabled = false
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {
@ -969,14 +967,14 @@ func TestDisableParsingDogstatsdLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = false parser := NewParser()
InfluxdbTagsEnabled = true parser.EnableInfluxdbParsing()
SignalFXTagsEnabled = true parser.EnableLibratoParsing()
LibratoTagsEnabled = true parser.EnableSignalFXParsing()
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {
@ -1208,14 +1206,14 @@ func TestDisableParsingInfluxdbLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = true parser := NewParser()
InfluxdbTagsEnabled = false parser.EnableDogstatsdParsing()
SignalFXTagsEnabled = true parser.EnableLibratoParsing()
LibratoTagsEnabled = true parser.EnableSignalFXParsing()
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {
@ -1447,14 +1445,14 @@ func TestDisableParsingSignalfxLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = true parser := NewParser()
InfluxdbTagsEnabled = true parser.EnableDogstatsdParsing()
SignalFXTagsEnabled = false parser.EnableInfluxdbParsing()
LibratoTagsEnabled = true parser.EnableLibratoParsing()
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {
@ -1686,14 +1684,14 @@ func TestDisableParsingLibratoLineToEvents(t *testing.T) {
}, },
} }
DogstatsdTagsEnabled = true parser := NewParser()
InfluxdbTagsEnabled = true parser.EnableDogstatsdParsing()
SignalFXTagsEnabled = true parser.EnableInfluxdbParsing()
LibratoTagsEnabled = false parser.EnableSignalFXParsing()
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger)
for j, expected := range testCase.out { for j, expected := range testCase.out {
if !reflect.DeepEqual(&expected, &events[j]) { if !reflect.DeepEqual(&expected, &events[j]) {

View file

@ -31,6 +31,7 @@ type StatsDUDPListener struct {
Conn *net.UDPConn Conn *net.UDPConn
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
LineParser *pkgLine.Parser
UDPPackets prometheus.Counter UDPPackets prometheus.Counter
LinesReceived prometheus.Counter LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter EventsFlushed prometheus.Counter
@ -67,7 +68,7 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) {
for _, line := range lines { for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
l.LinesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }
@ -75,6 +76,7 @@ type StatsDTCPListener struct {
Conn *net.TCPListener Conn *net.TCPListener
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
LineParser *pkgLine.Parser
LinesReceived prometheus.Counter LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec SampleErrors prometheus.CounterVec
@ -128,7 +130,7 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
break break
} }
l.LinesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }
@ -136,6 +138,7 @@ type StatsDUnixgramListener struct {
Conn *net.UnixConn Conn *net.UnixConn
EventHandler event.EventHandler EventHandler event.EventHandler
Logger log.Logger Logger log.Logger
LineParser *pkgLine.Parser
UnixgramPackets prometheus.Counter UnixgramPackets prometheus.Counter
LinesReceived prometheus.Counter LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter EventsFlushed prometheus.Counter
@ -172,6 +175,6 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
for _, line := range lines { for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line) level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line)
l.LinesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
} }