diff --git a/bridge_test.go b/bridge_test.go index aae7bf2..f4e2476 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -530,10 +531,17 @@ func TestHandlePacket(t *testing.T) { }, } + parser := line.NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() + for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ Conn: nil, EventHandler: nil, Logger: log.NewNopLogger(), + LineParser: parser, UDPPackets: udpPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, @@ -545,6 +553,7 @@ func TestHandlePacket(t *testing.T) { Conn: nil, EventHandler: nil, Logger: log.NewNopLogger(), + LineParser: parser, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 6d58569..5b27fe0 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/line" "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -47,6 +48,12 @@ func benchmarkUDPListener(times int, b *testing.B) { } } + parser := line.NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() + // reset benchmark timer to not measure startup costs b.ResetTimer() @@ -60,6 +67,7 @@ func benchmarkUDPListener(times int, b *testing.B) { l := listener.StatsDUDPListener{ EventHandler: &event.UnbufferedEventHandler{C: events}, Logger: logger, + LineParser: parser, UDPPackets: udpPackets, LinesReceived: linesReceived, SamplesReceived: samplesReceived, diff --git a/line_benchmark_test.go b/line_benchmark_test.go index 28166fa..a8bf07d 100644 --- a/line_benchmark_test.go +++ b/line_benchmark_test.go @@ -43,10 +43,19 @@ func benchmarkLinesToEvents(times int, b *testing.B, input []string) { // always report allocations since this is a hot path b.ReportAllocs() + parser := line.NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() + + // reset benchmark timer to not measure startup costs + b.ResetTimer() + for n := 0; n < b.N; n++ { for i := 0; i < times; i++ { for _, l := range input { - line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) + parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) } } } @@ -75,6 +84,12 @@ func BenchmarkLineFormats(b *testing.B) { "invalidInfluxDb": "foo3,tag1=bar,tag2:100|c", } + parser := line.NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() + // reset benchmark timer to not measure startup costs b.ResetTimer() @@ -83,7 +98,7 @@ func BenchmarkLineFormats(b *testing.B) { // always report allocations since this is a hot path b.ReportAllocs() for n := 0; n < b.N; n++ { - line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) + parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger) } }) } diff --git a/main.go b/main.go index ec3bc9c..6d8e09d 100644 --- a/main.go +++ b/main.go @@ -282,11 +282,19 @@ func main() { kingpin.Parse() logger := promlog.New(promlogConfig) - // Set line parsing options - line.DogstatsdTagsEnabled = *dogstatsdTagsEnabled - line.InfluxdbTagsEnabled = *influxdbTagsEnabled - line.LibratoTagsEnabled = *libratoTagsEnabled - line.SignalFXTagsEnabled = *signalFXTagsEnabled + parser := line.NewParser() + if *dogstatsdTagsEnabled { + parser.EnableDogstatsdParsing() + } + if *influxdbTagsEnabled { + parser.EnableInfluxdbParsing() + } + if *libratoTagsEnabled { + parser.EnableLibratoParsing() + } + if *signalFXTagsEnabled { + parser.EnableSignalFXParsing() + } cacheOption := mapper.WithCacheType(*cacheType) @@ -330,6 +338,7 @@ func main() { Conn: uconn, EventHandler: eventQueue, Logger: logger, + LineParser: parser, UDPPackets: udpPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, @@ -359,6 +368,7 @@ func main() { Conn: tconn, EventHandler: eventQueue, Logger: logger, + LineParser: parser, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, @@ -402,6 +412,7 @@ func main() { Conn: uxgconn, EventHandler: eventQueue, Logger: logger, + LineParser: parser, UnixgramPackets: unixgramPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index c92221b..644c5d7 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -617,11 +617,18 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { events := make(chan event.Events) ueh := &event.UnbufferedEventHandler{C: events} + parser := line.NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() + go func() { for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ Conn: nil, EventHandler: nil, Logger: log.NewNopLogger(), + LineParser: parser, UDPPackets: udpPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, @@ -633,6 +640,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { Conn: nil, EventHandler: nil, Logger: log.NewNopLogger(), + LineParser: parser, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, @@ -1059,11 +1067,16 @@ func BenchmarkParseDogStatsDTags(b *testing.B) { "a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5", } + parser := line.NewParser() + parser.EnableDogstatsdParsing() + + b.ResetTimer() + for name, tags := range scenarios { b.Run(name, func(b *testing.B) { for n := 0; n < b.N; n++ { labels := map[string]string{} - line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger()) + parser.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger()) } }) } diff --git a/pkg/line/line.go b/pkg/line/line.go index 50f93fd..baebe84 100644 --- a/pkg/line/line.go +++ b/pkg/line/line.go @@ -27,13 +27,39 @@ import ( "github.com/prometheus/statsd_exporter/pkg/mapper" ) -// These globals can be used to control parsing behavior -var ( - DogstatsdTagsEnabled = true - InfluxdbTagsEnabled = true - LibratoTagsEnabled = true - SignalFXTagsEnabled = true -) +// Parser is a struct to hold configuration for parsing behavior +type Parser struct { + DogstatsdTagsEnabled bool + InfluxdbTagsEnabled bool + LibratoTagsEnabled bool + SignalFXTagsEnabled bool +} + +// NewParser returns a new line parser +func NewParser() *Parser { + p := Parser{} + return &p +} + +// EnableDogstatsdParsing option to enable dogstatsd tag parsing +func (p *Parser) EnableDogstatsdParsing() { + p.DogstatsdTagsEnabled = true +} + +// EnableInfluxdbParsing option to enable influxdb tag parsing +func (p *Parser) EnableInfluxdbParsing() { + p.InfluxdbTagsEnabled = true +} + +// EnableLibratoParsing option to enable librato tag parsing +func (p *Parser) EnableLibratoParsing() { + p.LibratoTagsEnabled = true +} + +// EnableSignalFXParsing option to enable signalfx tag parsing +func (p *Parser) EnableSignalFXParsing() { + p.SignalFXTagsEnabled = true +} func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) { switch statType { @@ -122,8 +148,8 @@ func trimLeftHash(s string) string { return s } -func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { - if DogstatsdTagsEnabled { +func (p *Parser) ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { + if p.DogstatsdTagsEnabled { lastTagEndIndex := 0 for i, c := range component { if c == ',' { @@ -141,8 +167,8 @@ func ParseDogStatsDTags(component string, labels map[string]string, tagErrors pr } } -func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string { - if SignalFXTagsEnabled { +func (p *Parser) parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string { + if p.SignalFXTagsEnabled { // check for SignalFx tags first // `[` delimits start of tags by SignalFx // `]` delimits end of tags by SignalFx @@ -168,7 +194,7 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu // https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags // `,` delimits start of tags by InfluxDB // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd - if (c == '#' && LibratoTagsEnabled) || (c == ',' && InfluxdbTagsEnabled) { + if (c == '#' && p.LibratoTagsEnabled) || (c == ',' && p.InfluxdbTagsEnabled) { parseNameTags(name[i+1:], labels, tagErrors, logger) return name[:i] } @@ -176,7 +202,7 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu return name } -func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events { +func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events { events := event.Events{} if line == "" { return events @@ -190,7 +216,7 @@ func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceiv } labels := map[string]string{} - metric := parseNameAndTags(elements[0], labels, tagErrors, logger) + metric := p.parseNameAndTags(elements[0], labels, tagErrors, logger) var samples []string if strings.Contains(elements[1], "|#") { @@ -264,7 +290,7 @@ samples: multiplyEvents = int(1 / samplingFactor) } case '#': - ParseDogStatsDTags(component[1:], labels, tagErrors, logger) + p.ParseDogStatsDTags(component[1:], labels, tagErrors, logger) default: level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line) sampleErrors.WithLabelValues("invalid_sample_factor").Inc() diff --git a/pkg/line/line_test.go b/pkg/line/line_test.go index 2d444cc..b4b67aa 100644 --- a/pkg/line/line_test.go +++ b/pkg/line/line_test.go @@ -491,14 +491,15 @@ func TestLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = true - InfluxdbTagsEnabled = true - SignalFXTagsEnabled = true - LibratoTagsEnabled = true + parser := NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { @@ -730,14 +731,11 @@ func TestDisableParsingLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = false - InfluxdbTagsEnabled = false - SignalFXTagsEnabled = false - LibratoTagsEnabled = false + parser := NewParser() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { @@ -969,14 +967,14 @@ func TestDisableParsingDogstatsdLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = false - InfluxdbTagsEnabled = true - SignalFXTagsEnabled = true - LibratoTagsEnabled = true + parser := NewParser() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { @@ -1208,14 +1206,14 @@ func TestDisableParsingInfluxdbLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = true - InfluxdbTagsEnabled = false - SignalFXTagsEnabled = true - LibratoTagsEnabled = true + parser := NewParser() + parser.EnableDogstatsdParsing() + parser.EnableLibratoParsing() + parser.EnableSignalFXParsing() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { @@ -1447,14 +1445,14 @@ func TestDisableParsingSignalfxLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = true - InfluxdbTagsEnabled = true - SignalFXTagsEnabled = false - LibratoTagsEnabled = true + parser := NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableLibratoParsing() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { @@ -1686,14 +1684,14 @@ func TestDisableParsingLibratoLineToEvents(t *testing.T) { }, } - DogstatsdTagsEnabled = true - InfluxdbTagsEnabled = true - SignalFXTagsEnabled = true - LibratoTagsEnabled = false + parser := NewParser() + parser.EnableDogstatsdParsing() + parser.EnableInfluxdbParsing() + parser.EnableSignalFXParsing() for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - events := LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) + events := parser.LineToEvents(testCase.in, *nopSampleErrors, nopSamplesReceived, nopTagErrors, nopTagsReceived, nopLogger) for j, expected := range testCase.out { if !reflect.DeepEqual(&expected, &events[j]) { diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 5c31f53..0607180 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -31,6 +31,7 @@ type StatsDUDPListener struct { Conn *net.UDPConn EventHandler event.EventHandler Logger log.Logger + LineParser *pkgLine.Parser UDPPackets prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter @@ -67,7 +68,7 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) { for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) l.LinesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } @@ -75,6 +76,7 @@ type StatsDTCPListener struct { Conn *net.TCPListener EventHandler event.EventHandler Logger log.Logger + LineParser *pkgLine.Parser LinesReceived prometheus.Counter EventsFlushed prometheus.Counter SampleErrors prometheus.CounterVec @@ -128,7 +130,7 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { break } l.LinesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } @@ -136,6 +138,7 @@ type StatsDUnixgramListener struct { Conn *net.UnixConn EventHandler event.EventHandler Logger log.Logger + LineParser *pkgLine.Parser UnixgramPackets prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter @@ -172,6 +175,6 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line) l.LinesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } }