Break out the telemetry during sample processing

The "packets" metric had heavily overloaded meaning for different
"outcomes", and would often be incremented multiple times, sometimes
even with a single (per-line) increment in one outcome corresponding to
multiple increments in another.

This removes the broken metric, and replaces it with separate total and
error counters for each level of processing. This allows monitoring the
network traffic handled separately from the samples incurred by it.
This commit is contained in:
Matthias Rampke 2017-11-10 19:23:54 +00:00
parent d4d0b4a6a7
commit cdf79ba2f9
2 changed files with 79 additions and 18 deletions

View file

@ -404,14 +404,14 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma
func parseDogStatsDTagsToLabels(component string) map[string]string { func parseDogStatsDTagsToLabels(component string) map[string]string {
labels := map[string]string{} labels := map[string]string{}
networkStats.WithLabelValues("dogstatsd_tags").Inc() tagsReceived.Inc()
tags := strings.Split(component, ",") tags := strings.Split(component, ",")
for _, t := range tags { for _, t := range tags {
t = strings.TrimPrefix(t, "#") t = strings.TrimPrefix(t, "#")
kv := strings.SplitN(t, ":", 2) kv := strings.SplitN(t, ":", 2)
if len(kv) < 2 || len(kv[1]) == 0 { if len(kv) < 2 || len(kv[1]) == 0 {
networkStats.WithLabelValues("malformed_dogstatsd_tag").Inc() tagErrors.Inc()
log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component) log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component)
continue continue
} }
@ -429,7 +429,7 @@ func lineToEvents(line string) Events {
elements := strings.SplitN(line, ":", 2) elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc() sampleErrors.WithLabelValues("malformed_line").Inc()
log.Debugln("Bad line from StatsD:", line) log.Debugln("Bad line from StatsD:", line)
return events return events
} }
@ -443,10 +443,11 @@ func lineToEvents(line string) Events {
} }
samples: samples:
for _, sample := range samples { for _, sample := range samples {
samplesReceived.Inc()
components := strings.Split(sample, "|") components := strings.Split(sample, "|")
samplingFactor := 1.0 samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 { if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc() sampleErrors.WithLabelValues("malformed_component").Inc()
log.Debugln("Bad component on line:", line) log.Debugln("Bad component on line:", line)
continue continue
} }
@ -460,7 +461,7 @@ samples:
value, err := strconv.ParseFloat(valueStr, 64) value, err := strconv.ParseFloat(valueStr, 64)
if err != nil { if err != nil {
log.Debugf("Bad value %s on line: %s", valueStr, line) log.Debugf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc() sampleErrors.WithLabelValues("malformed_value").Inc()
continue continue
} }
@ -470,7 +471,7 @@ samples:
for _, component := range components[2:] { for _, component := range components[2:] {
if len(component) == 0 { if len(component) == 0 {
log.Debugln("Empty component on line: ", line) log.Debugln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc() sampleErrors.WithLabelValues("malformed_component").Inc()
continue samples continue samples
} }
} }
@ -480,13 +481,13 @@ samples:
case '@': case '@':
if statType != "c" && statType != "ms" { if statType != "c" && statType != "ms" {
log.Debugln("Illegal sampling factor for non-counter metric on line", line) log.Debugln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc() sampleErrors.WithLabelValues("illegal_sample_factor").Inc()
continue continue
} }
samplingFactor, err = strconv.ParseFloat(component[1:], 64) samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil { if err != nil {
log.Debugf("Invalid sampling factor %s on line %s", component[1:], line) log.Debugf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc() sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
} }
if samplingFactor == 0 { if samplingFactor == 0 {
samplingFactor = 1 samplingFactor = 1
@ -501,7 +502,7 @@ samples:
labels = parseDogStatsDTagsToLabels(component) labels = parseDogStatsDTagsToLabels(component)
default: default:
log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line) log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc() sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
continue continue
} }
} }
@ -511,12 +512,11 @@ samples:
event, err := buildEvent(statType, metric, value, relative, labels) event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil { if err != nil {
log.Debugf("Error building event on line %s: %s", line, err) log.Debugf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc() sampleErrors.WithLabelValues("illegal_event").Inc()
continue continue
} }
events = append(events, event) events = append(events, event)
} }
networkStats.WithLabelValues("legal").Inc()
} }
return events return events
} }
@ -537,9 +537,11 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) {
} }
func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) {
udpPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
events := Events{} events := Events{}
for _, line := range lines { for _, line := range lines {
linesReceived.Inc()
events = append(events, lineToEvents(line)...) events = append(events, lineToEvents(line)...)
} }
e <- events e <- events
@ -562,21 +564,24 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) {
func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
defer c.Close() defer c.Close()
tcpConnections.Inc()
r := bufio.NewReader(c) r := bufio.NewReader(c)
for { for {
line, isPrefix, err := r.ReadLine() line, isPrefix, err := r.ReadLine()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
networkStats.WithLabelValues("tcp_error").Inc() tcpErrors.Inc()
log.Debugf("Read %s failed: %v", c.RemoteAddr(), err) log.Debugf("Read %s failed: %v", c.RemoteAddr(), err)
} }
break break
} }
if isPrefix { if isPrefix {
networkStats.WithLabelValues("tcp_line_too_long").Inc() tcpLineTooLong.Inc()
log.Debugf("Read %s failed: line too long", c.RemoteAddr()) log.Debugf("Read %s failed: line too long", c.RemoteAddr())
break break
} }
linesReceived.Inc()
e <- lineToEvents(string(line)) e <- lineToEvents(string(line))
} }
} }

View file

@ -29,12 +29,60 @@ var (
Name: "statsd_exporter_events_unmapped_total", Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.", Help: "The total number of StatsD events no mapping was found for.",
}) })
networkStats = prometheus.NewCounterVec( udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_exporter_packets_total", Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets seen.", Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsign DogStatsD tags.",
}, },
[]string{"type"},
) )
configLoads = prometheus.NewCounterVec( configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -58,7 +106,15 @@ var (
func init() { func init() {
prometheus.MustRegister(eventStats) prometheus.MustRegister(eventStats)
prometheus.MustRegister(networkStats) prometheus.MustRegister(udpPackets)
prometheus.MustRegister(tcpConnections)
prometheus.MustRegister(tcpErrors)
prometheus.MustRegister(tcpLineTooLong)
prometheus.MustRegister(linesReceived)
prometheus.MustRegister(samplesReceived)
prometheus.MustRegister(sampleErrors)
prometheus.MustRegister(tagsReceived)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(configLoads) prometheus.MustRegister(configLoads)
prometheus.MustRegister(mappingsCount) prometheus.MustRegister(mappingsCount)
prometheus.MustRegister(conflictingEventStats) prometheus.MustRegister(conflictingEventStats)