forked from mirrors/statsd_exporter
Merge pull request #325 from glightfoot/disable-tags
Optionally disable tag parsing
This commit is contained in:
commit
e0a39974e2
9 changed files with 1866 additions and 39 deletions
16
README.md
16
README.md
|
@ -77,6 +77,14 @@ metric.name[tagName=val,tag2Name=val2]:0|c
|
||||||
Be aware: If you mix tag styles (e.g., Librato/InfluxDB with DogStatsD), the exporter will consider this an error and the behavior is undefined.
|
Be aware: If you mix tag styles (e.g., Librato/InfluxDB with DogStatsD), the exporter will consider this an error and the behavior is undefined.
|
||||||
Also, tags without values (`#some_tag`) are not supported and will be ignored.
|
Also, tags without values (`#some_tag`) are not supported and will be ignored.
|
||||||
|
|
||||||
|
The exporter parses all tagging formats by default, but individual tagging formats can be disabled with command line flags:
|
||||||
|
```
|
||||||
|
--no-statsd.parse-dogstatsd-tags
|
||||||
|
--no-statsd.parse-influxdb-tags
|
||||||
|
--no-statsd.parse-librato-tags
|
||||||
|
--no-statsd.parse-signalfx-tags
|
||||||
|
```
|
||||||
|
|
||||||
## Building and Running
|
## Building and Running
|
||||||
|
|
||||||
NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/kingpin) flags library. With this change, flag behaviour is POSIX-ish:
|
NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/kingpin) flags library. With this change, flag behaviour is POSIX-ish:
|
||||||
|
@ -131,6 +139,14 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
|
||||||
--debug.dump-fsm="" The path to dump internal FSM generated for glob
|
--debug.dump-fsm="" The path to dump internal FSM generated for glob
|
||||||
matching as Dot file.
|
matching as Dot file.
|
||||||
--check-config Check configuration and exit.
|
--check-config Check configuration and exit.
|
||||||
|
--statsd.parse-dogstatsd-tags
|
||||||
|
Parse DogStatsd style tags. Enabled by default.
|
||||||
|
--statsd.parse-influxdb-tags
|
||||||
|
Parse InfluxDB style tags. Enabled by default.
|
||||||
|
--statsd.parse-librato-tags
|
||||||
|
Parse Librato style tags. Enabled by default.
|
||||||
|
--statsd.parse-signalfx-tags
|
||||||
|
Parse SignalFX style tags. Enabled by default.
|
||||||
--log.level=info Only log messages with the given severity or
|
--log.level=info Only log messages with the given severity or
|
||||||
above. One of: [debug, info, warn, error]
|
above. One of: [debug, info, warn, error]
|
||||||
--log.format=logfmt Output format of log messages. One of: [logfmt,
|
--log.format=logfmt Output format of log messages. One of: [logfmt,
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/prometheus/statsd_exporter/pkg/clock"
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/event"
|
"github.com/prometheus/statsd_exporter/pkg/event"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/line"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/listener"
|
"github.com/prometheus/statsd_exporter/pkg/listener"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
@ -530,10 +531,17 @@ func TestHandlePacket(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
|
||||||
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
|
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
|
||||||
Conn: nil,
|
Conn: nil,
|
||||||
EventHandler: nil,
|
EventHandler: nil,
|
||||||
Logger: log.NewNopLogger(),
|
Logger: log.NewNopLogger(),
|
||||||
|
LineParser: parser,
|
||||||
UDPPackets: udpPackets,
|
UDPPackets: udpPackets,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
|
@ -545,6 +553,7 @@ func TestHandlePacket(t *testing.T) {
|
||||||
Conn: nil,
|
Conn: nil,
|
||||||
EventHandler: nil,
|
EventHandler: nil,
|
||||||
Logger: log.NewNopLogger(),
|
Logger: log.NewNopLogger(),
|
||||||
|
LineParser: parser,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
SampleErrors: *sampleErrors,
|
SampleErrors: *sampleErrors,
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/statsd_exporter/pkg/event"
|
"github.com/prometheus/statsd_exporter/pkg/event"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/line"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/listener"
|
"github.com/prometheus/statsd_exporter/pkg/listener"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
@ -47,6 +48,12 @@ func benchmarkUDPListener(times int, b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
|
||||||
// reset benchmark timer to not measure startup costs
|
// reset benchmark timer to not measure startup costs
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
|
@ -60,6 +67,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
|
||||||
l := listener.StatsDUDPListener{
|
l := listener.StatsDUDPListener{
|
||||||
EventHandler: &event.UnbufferedEventHandler{C: events},
|
EventHandler: &event.UnbufferedEventHandler{C: events},
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
|
LineParser: parser,
|
||||||
UDPPackets: udpPackets,
|
UDPPackets: udpPackets,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
SamplesReceived: samplesReceived,
|
SamplesReceived: samplesReceived,
|
||||||
|
|
|
@ -43,10 +43,19 @@ func benchmarkLinesToEvents(times int, b *testing.B, input []string) {
|
||||||
// always report allocations since this is a hot path
|
// always report allocations since this is a hot path
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
|
||||||
|
// reset benchmark timer to not measure startup costs
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
for i := 0; i < times; i++ {
|
for i := 0; i < times; i++ {
|
||||||
for _, l := range input {
|
for _, l := range input {
|
||||||
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
|
parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -75,6 +84,12 @@ func BenchmarkLineFormats(b *testing.B) {
|
||||||
"invalidInfluxDb": "foo3,tag1=bar,tag2:100|c",
|
"invalidInfluxDb": "foo3,tag1=bar,tag2:100|c",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
|
||||||
// reset benchmark timer to not measure startup costs
|
// reset benchmark timer to not measure startup costs
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
|
@ -83,7 +98,7 @@ func BenchmarkLineFormats(b *testing.B) {
|
||||||
// always report allocations since this is a hot path
|
// always report allocations since this is a hot path
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
|
parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
22
main.go
22
main.go
|
@ -35,6 +35,7 @@ import (
|
||||||
"github.com/prometheus/statsd_exporter/pkg/address"
|
"github.com/prometheus/statsd_exporter/pkg/address"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/event"
|
"github.com/prometheus/statsd_exporter/pkg/event"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
"github.com/prometheus/statsd_exporter/pkg/exporter"
|
||||||
|
"github.com/prometheus/statsd_exporter/pkg/line"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/listener"
|
"github.com/prometheus/statsd_exporter/pkg/listener"
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
@ -268,6 +269,10 @@ func main() {
|
||||||
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
|
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
|
||||||
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
|
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
|
||||||
checkConfig = kingpin.Flag("check-config", "Check configuration and exit.").Default("false").Bool()
|
checkConfig = kingpin.Flag("check-config", "Check configuration and exit.").Default("false").Bool()
|
||||||
|
dogstatsdTagsEnabled = kingpin.Flag("statsd.parse-dogstatsd-tags", "Parse DogStatsd style tags. Enabled by default.").Default("true").Bool()
|
||||||
|
influxdbTagsEnabled = kingpin.Flag("statsd.parse-influxdb-tags", "Parse InfluxDB style tags. Enabled by default.").Default("true").Bool()
|
||||||
|
libratoTagsEnabled = kingpin.Flag("statsd.parse-librato-tags", "Parse Librato style tags. Enabled by default.").Default("true").Bool()
|
||||||
|
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
|
||||||
)
|
)
|
||||||
|
|
||||||
promlogConfig := &promlog.Config{}
|
promlogConfig := &promlog.Config{}
|
||||||
|
@ -277,6 +282,20 @@ func main() {
|
||||||
kingpin.Parse()
|
kingpin.Parse()
|
||||||
logger := promlog.New(promlogConfig)
|
logger := promlog.New(promlogConfig)
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
if *dogstatsdTagsEnabled {
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
}
|
||||||
|
if *influxdbTagsEnabled {
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
}
|
||||||
|
if *libratoTagsEnabled {
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
}
|
||||||
|
if *signalFXTagsEnabled {
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
}
|
||||||
|
|
||||||
cacheOption := mapper.WithCacheType(*cacheType)
|
cacheOption := mapper.WithCacheType(*cacheType)
|
||||||
|
|
||||||
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
|
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
|
||||||
|
@ -319,6 +338,7 @@ func main() {
|
||||||
Conn: uconn,
|
Conn: uconn,
|
||||||
EventHandler: eventQueue,
|
EventHandler: eventQueue,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
|
LineParser: parser,
|
||||||
UDPPackets: udpPackets,
|
UDPPackets: udpPackets,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
|
@ -348,6 +368,7 @@ func main() {
|
||||||
Conn: tconn,
|
Conn: tconn,
|
||||||
EventHandler: eventQueue,
|
EventHandler: eventQueue,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
|
LineParser: parser,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
SampleErrors: *sampleErrors,
|
SampleErrors: *sampleErrors,
|
||||||
|
@ -391,6 +412,7 @@ func main() {
|
||||||
Conn: uxgconn,
|
Conn: uxgconn,
|
||||||
EventHandler: eventQueue,
|
EventHandler: eventQueue,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
|
LineParser: parser,
|
||||||
UnixgramPackets: unixgramPackets,
|
UnixgramPackets: unixgramPackets,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
|
|
|
@ -617,11 +617,18 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
||||||
events := make(chan event.Events)
|
events := make(chan event.Events)
|
||||||
ueh := &event.UnbufferedEventHandler{C: events}
|
ueh := &event.UnbufferedEventHandler{C: events}
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
parser.EnableInfluxdbParsing()
|
||||||
|
parser.EnableLibratoParsing()
|
||||||
|
parser.EnableSignalFXParsing()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
|
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
|
||||||
Conn: nil,
|
Conn: nil,
|
||||||
EventHandler: nil,
|
EventHandler: nil,
|
||||||
Logger: log.NewNopLogger(),
|
Logger: log.NewNopLogger(),
|
||||||
|
LineParser: parser,
|
||||||
UDPPackets: udpPackets,
|
UDPPackets: udpPackets,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
|
@ -633,6 +640,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
||||||
Conn: nil,
|
Conn: nil,
|
||||||
EventHandler: nil,
|
EventHandler: nil,
|
||||||
Logger: log.NewNopLogger(),
|
Logger: log.NewNopLogger(),
|
||||||
|
LineParser: parser,
|
||||||
LinesReceived: linesReceived,
|
LinesReceived: linesReceived,
|
||||||
EventsFlushed: eventsFlushed,
|
EventsFlushed: eventsFlushed,
|
||||||
SampleErrors: *sampleErrors,
|
SampleErrors: *sampleErrors,
|
||||||
|
@ -1059,11 +1067,16 @@ func BenchmarkParseDogStatsDTags(b *testing.B) {
|
||||||
"a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5",
|
"a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parser := line.NewParser()
|
||||||
|
parser.EnableDogstatsdParsing()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
for name, tags := range scenarios {
|
for name, tags := range scenarios {
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
labels := map[string]string{}
|
labels := map[string]string{}
|
||||||
line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
|
parser.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,40 @@ import (
|
||||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Parser is a struct to hold configuration for parsing behavior
|
||||||
|
type Parser struct {
|
||||||
|
DogstatsdTagsEnabled bool
|
||||||
|
InfluxdbTagsEnabled bool
|
||||||
|
LibratoTagsEnabled bool
|
||||||
|
SignalFXTagsEnabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewParser returns a new line parser
|
||||||
|
func NewParser() *Parser {
|
||||||
|
p := Parser{}
|
||||||
|
return &p
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableDogstatsdParsing option to enable dogstatsd tag parsing
|
||||||
|
func (p *Parser) EnableDogstatsdParsing() {
|
||||||
|
p.DogstatsdTagsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableInfluxdbParsing option to enable influxdb tag parsing
|
||||||
|
func (p *Parser) EnableInfluxdbParsing() {
|
||||||
|
p.InfluxdbTagsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableLibratoParsing option to enable librato tag parsing
|
||||||
|
func (p *Parser) EnableLibratoParsing() {
|
||||||
|
p.LibratoTagsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableSignalFXParsing option to enable signalfx tag parsing
|
||||||
|
func (p *Parser) EnableSignalFXParsing() {
|
||||||
|
p.SignalFXTagsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) {
|
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) {
|
||||||
switch statType {
|
switch statType {
|
||||||
case "c":
|
case "c":
|
||||||
|
@ -114,7 +148,8 @@ func trimLeftHash(s string) string {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
|
func (p *Parser) ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
|
||||||
|
if p.DogstatsdTagsEnabled {
|
||||||
lastTagEndIndex := 0
|
lastTagEndIndex := 0
|
||||||
for i, c := range component {
|
for i, c := range component {
|
||||||
if c == ',' {
|
if c == ',' {
|
||||||
|
@ -130,8 +165,10 @@ func ParseDogStatsDTags(component string, labels map[string]string, tagErrors pr
|
||||||
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
|
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
|
func (p *Parser) parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
|
||||||
|
if p.SignalFXTagsEnabled {
|
||||||
// check for SignalFx tags first
|
// check for SignalFx tags first
|
||||||
// `[` delimits start of tags by SignalFx
|
// `[` delimits start of tags by SignalFx
|
||||||
// `]` delimits end of tags by SignalFx
|
// `]` delimits end of tags by SignalFx
|
||||||
|
@ -150,13 +187,14 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu
|
||||||
tagErrors.Inc()
|
tagErrors.Inc()
|
||||||
return name
|
return name
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, c := range name {
|
for i, c := range name {
|
||||||
// `#` delimits start of tags by Librato
|
// `#` delimits start of tags by Librato
|
||||||
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
|
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
|
||||||
// `,` delimits start of tags by InfluxDB
|
// `,` delimits start of tags by InfluxDB
|
||||||
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
|
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
|
||||||
if c == '#' || c == ',' {
|
if (c == '#' && p.LibratoTagsEnabled) || (c == ',' && p.InfluxdbTagsEnabled) {
|
||||||
parseNameTags(name[i+1:], labels, tagErrors, logger)
|
parseNameTags(name[i+1:], labels, tagErrors, logger)
|
||||||
return name[:i]
|
return name[:i]
|
||||||
}
|
}
|
||||||
|
@ -164,7 +202,7 @@ func parseNameAndTags(name string, labels map[string]string, tagErrors prometheu
|
||||||
return name
|
return name
|
||||||
}
|
}
|
||||||
|
|
||||||
func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
|
func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
|
||||||
events := event.Events{}
|
events := event.Events{}
|
||||||
if line == "" {
|
if line == "" {
|
||||||
return events
|
return events
|
||||||
|
@ -178,7 +216,7 @@ func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceiv
|
||||||
}
|
}
|
||||||
|
|
||||||
labels := map[string]string{}
|
labels := map[string]string{}
|
||||||
metric := parseNameAndTags(elements[0], labels, tagErrors, logger)
|
metric := p.parseNameAndTags(elements[0], labels, tagErrors, logger)
|
||||||
|
|
||||||
var samples []string
|
var samples []string
|
||||||
if strings.Contains(elements[1], "|#") {
|
if strings.Contains(elements[1], "|#") {
|
||||||
|
@ -252,7 +290,7 @@ samples:
|
||||||
multiplyEvents = int(1 / samplingFactor)
|
multiplyEvents = int(1 / samplingFactor)
|
||||||
}
|
}
|
||||||
case '#':
|
case '#':
|
||||||
ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
|
p.ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
|
||||||
default:
|
default:
|
||||||
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
|
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
|
||||||
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
|
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
|
||||||
|
|
1703
pkg/line/line_test.go
Normal file
1703
pkg/line/line_test.go
Normal file
File diff suppressed because it is too large
Load diff
|
@ -31,6 +31,7 @@ type StatsDUDPListener struct {
|
||||||
Conn *net.UDPConn
|
Conn *net.UDPConn
|
||||||
EventHandler event.EventHandler
|
EventHandler event.EventHandler
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
|
LineParser *pkgLine.Parser
|
||||||
UDPPackets prometheus.Counter
|
UDPPackets prometheus.Counter
|
||||||
LinesReceived prometheus.Counter
|
LinesReceived prometheus.Counter
|
||||||
EventsFlushed prometheus.Counter
|
EventsFlushed prometheus.Counter
|
||||||
|
@ -67,7 +68,7 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) {
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
|
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
|
||||||
l.LinesReceived.Inc()
|
l.LinesReceived.Inc()
|
||||||
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,6 +76,7 @@ type StatsDTCPListener struct {
|
||||||
Conn *net.TCPListener
|
Conn *net.TCPListener
|
||||||
EventHandler event.EventHandler
|
EventHandler event.EventHandler
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
|
LineParser *pkgLine.Parser
|
||||||
LinesReceived prometheus.Counter
|
LinesReceived prometheus.Counter
|
||||||
EventsFlushed prometheus.Counter
|
EventsFlushed prometheus.Counter
|
||||||
SampleErrors prometheus.CounterVec
|
SampleErrors prometheus.CounterVec
|
||||||
|
@ -128,7 +130,7 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
l.LinesReceived.Inc()
|
l.LinesReceived.Inc()
|
||||||
l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,6 +138,7 @@ type StatsDUnixgramListener struct {
|
||||||
Conn *net.UnixConn
|
Conn *net.UnixConn
|
||||||
EventHandler event.EventHandler
|
EventHandler event.EventHandler
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
|
LineParser *pkgLine.Parser
|
||||||
UnixgramPackets prometheus.Counter
|
UnixgramPackets prometheus.Counter
|
||||||
LinesReceived prometheus.Counter
|
LinesReceived prometheus.Counter
|
||||||
EventsFlushed prometheus.Counter
|
EventsFlushed prometheus.Counter
|
||||||
|
@ -172,6 +175,6 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line)
|
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line)
|
||||||
l.LinesReceived.Inc()
|
l.LinesReceived.Inc()
|
||||||
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue