From cdf79ba2f93aea5207dc0640ee38dbb76c9defdc Mon Sep 17 00:00:00 2001 From: Matthias Rampke Date: Fri, 10 Nov 2017 19:23:54 +0000 Subject: [PATCH] Break out the telemetry during sample processing The "packets" metric had heavily overloaded meaning for different "outcomes", and would often be incremented multiple times, sometimes even with a single (per-line) increment in one outcome corresponding to multiple increments in another. This removes the broken metric, and replaces it with separate total and error counters for each level of processing. This allows monitoring the network traffic handled separately from the samples incurred by it. --- exporter.go | 31 +++++++++++++----------- telemetry.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/exporter.go b/exporter.go index 6c026e1..e82497e 100644 --- a/exporter.go +++ b/exporter.go @@ -404,14 +404,14 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma func parseDogStatsDTagsToLabels(component string) map[string]string { labels := map[string]string{} - networkStats.WithLabelValues("dogstatsd_tags").Inc() + tagsReceived.Inc() tags := strings.Split(component, ",") for _, t := range tags { t = strings.TrimPrefix(t, "#") kv := strings.SplitN(t, ":", 2) if len(kv) < 2 || len(kv[1]) == 0 { - networkStats.WithLabelValues("malformed_dogstatsd_tag").Inc() + tagErrors.Inc() log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component) continue } @@ -429,7 +429,7 @@ func lineToEvents(line string) Events { elements := strings.SplitN(line, ":", 2) if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { - networkStats.WithLabelValues("malformed_line").Inc() + sampleErrors.WithLabelValues("malformed_line").Inc() log.Debugln("Bad line from StatsD:", line) return events } @@ -443,10 +443,11 @@ func lineToEvents(line string) Events { } samples: for _, sample := range samples { + samplesReceived.Inc() components := strings.Split(sample, "|") samplingFactor := 1.0 if len(components) < 2 || len(components) > 4 { - networkStats.WithLabelValues("malformed_component").Inc() + sampleErrors.WithLabelValues("malformed_component").Inc() log.Debugln("Bad component on line:", line) continue } @@ -460,7 +461,7 @@ samples: value, err := strconv.ParseFloat(valueStr, 64) if err != nil { log.Debugf("Bad value %s on line: %s", valueStr, line) - networkStats.WithLabelValues("malformed_value").Inc() + sampleErrors.WithLabelValues("malformed_value").Inc() continue } @@ -470,7 +471,7 @@ samples: for _, component := range components[2:] { if len(component) == 0 { log.Debugln("Empty component on line: ", line) - networkStats.WithLabelValues("malformed_component").Inc() + sampleErrors.WithLabelValues("malformed_component").Inc() continue samples } } @@ -480,13 +481,13 @@ samples: case '@': if statType != "c" && statType != "ms" { log.Debugln("Illegal sampling factor for non-counter metric on line", line) - networkStats.WithLabelValues("illegal_sample_factor").Inc() + sampleErrors.WithLabelValues("illegal_sample_factor").Inc() continue } samplingFactor, err = strconv.ParseFloat(component[1:], 64) if err != nil { log.Debugf("Invalid sampling factor %s on line %s", component[1:], line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() + sampleErrors.WithLabelValues("invalid_sample_factor").Inc() } if samplingFactor == 0 { samplingFactor = 1 @@ -501,7 +502,7 @@ samples: labels = parseDogStatsDTagsToLabels(component) default: log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() + sampleErrors.WithLabelValues("invalid_sample_factor").Inc() continue } } @@ -511,12 +512,11 @@ samples: event, err := buildEvent(statType, metric, value, relative, labels) if err != nil { log.Debugf("Error building event on line %s: %s", line, err) - networkStats.WithLabelValues("illegal_event").Inc() + sampleErrors.WithLabelValues("illegal_event").Inc() continue } events = append(events, event) } - networkStats.WithLabelValues("legal").Inc() } return events } @@ -537,9 +537,11 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) { } func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { + udpPackets.Inc() lines := strings.Split(string(packet), "\n") events := Events{} for _, line := range lines { + linesReceived.Inc() events = append(events, lineToEvents(line)...) } e <- events @@ -562,21 +564,24 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) { func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { defer c.Close() + tcpConnections.Inc() + r := bufio.NewReader(c) for { line, isPrefix, err := r.ReadLine() if err != nil { if err != io.EOF { - networkStats.WithLabelValues("tcp_error").Inc() + tcpErrors.Inc() log.Debugf("Read %s failed: %v", c.RemoteAddr(), err) } break } if isPrefix { - networkStats.WithLabelValues("tcp_line_too_long").Inc() + tcpLineTooLong.Inc() log.Debugf("Read %s failed: line too long", c.RemoteAddr()) break } + linesReceived.Inc() e <- lineToEvents(string(line)) } } diff --git a/telemetry.go b/telemetry.go index ae07fcf..92e174b 100644 --- a/telemetry.go +++ b/telemetry.go @@ -29,12 +29,60 @@ var ( Name: "statsd_exporter_events_unmapped_total", Help: "The total number of StatsD events no mapping was found for.", }) - networkStats = prometheus.NewCounterVec( + udpPackets = prometheus.NewCounter( prometheus.CounterOpts{ - Name: "statsd_exporter_packets_total", - Help: "The total number of StatsD packets seen.", + Name: "statsd_exporter_udp_packets_total", + Help: "The total number of StatsD packets received over UDP.", + }, + ) + tcpConnections = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connections_total", + Help: "The total number of TCP connections handled.", + }, + ) + tcpErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connection_errors_total", + Help: "The number of errors encountered reading from TCP.", + }, + ) + tcpLineTooLong = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_too_long_lines_total", + Help: "The number of lines discarded due to being too long.", + }, + ) + linesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_lines_total", + Help: "The total number of StatsD lines received.", + }, + ) + samplesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_samples_total", + Help: "The total number of StatsD samples received.", + }, + ) + sampleErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_sample_errors_total", + Help: "The total number of errors parsing StatsD samples.", + }, + []string{"reason"}, + ) + tagsReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tags_total", + Help: "The total number of DogStatsD tags processed.", + }, + ) + tagErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tag_errors_total", + Help: "The number of errors parsign DogStatsD tags.", }, - []string{"type"}, ) configLoads = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -58,7 +106,15 @@ var ( func init() { prometheus.MustRegister(eventStats) - prometheus.MustRegister(networkStats) + prometheus.MustRegister(udpPackets) + prometheus.MustRegister(tcpConnections) + prometheus.MustRegister(tcpErrors) + prometheus.MustRegister(tcpLineTooLong) + prometheus.MustRegister(linesReceived) + prometheus.MustRegister(samplesReceived) + prometheus.MustRegister(sampleErrors) + prometheus.MustRegister(tagsReceived) + prometheus.MustRegister(tagErrors) prometheus.MustRegister(configLoads) prometheus.MustRegister(mappingsCount) prometheus.MustRegister(conflictingEventStats)