Merge pull request #106 from prometheus/mr/rework-sample-counters

Break out the telemetry during sample processing
This commit is contained in:
Matthias Rampke 2017-11-15 09:42:46 +01:00 committed by GitHub
commit d9db83b9be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 18 deletions

View file

@ -402,14 +402,14 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma
func parseDogStatsDTagsToLabels(component string) map[string]string { func parseDogStatsDTagsToLabels(component string) map[string]string {
labels := map[string]string{} labels := map[string]string{}
networkStats.WithLabelValues("dogstatsd_tags").Inc() tagsReceived.Inc()
tags := strings.Split(component, ",") tags := strings.Split(component, ",")
for _, t := range tags { for _, t := range tags {
t = strings.TrimPrefix(t, "#") t = strings.TrimPrefix(t, "#")
kv := strings.SplitN(t, ":", 2) kv := strings.SplitN(t, ":", 2)
if len(kv) < 2 || len(kv[1]) == 0 { if len(kv) < 2 || len(kv[1]) == 0 {
networkStats.WithLabelValues("malformed_dogstatsd_tag").Inc() tagErrors.Inc()
log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component) log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component)
continue continue
} }
@ -427,7 +427,7 @@ func lineToEvents(line string) Events {
elements := strings.SplitN(line, ":", 2) elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc() sampleErrors.WithLabelValues("malformed_line").Inc()
log.Debugln("Bad line from StatsD:", line) log.Debugln("Bad line from StatsD:", line)
return events return events
} }
@ -441,10 +441,11 @@ func lineToEvents(line string) Events {
} }
samples: samples:
for _, sample := range samples { for _, sample := range samples {
samplesReceived.Inc()
components := strings.Split(sample, "|") components := strings.Split(sample, "|")
samplingFactor := 1.0 samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 { if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc() sampleErrors.WithLabelValues("malformed_component").Inc()
log.Debugln("Bad component on line:", line) log.Debugln("Bad component on line:", line)
continue continue
} }
@ -458,7 +459,7 @@ samples:
value, err := strconv.ParseFloat(valueStr, 64) value, err := strconv.ParseFloat(valueStr, 64)
if err != nil { if err != nil {
log.Debugf("Bad value %s on line: %s", valueStr, line) log.Debugf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc() sampleErrors.WithLabelValues("malformed_value").Inc()
continue continue
} }
@ -468,7 +469,7 @@ samples:
for _, component := range components[2:] { for _, component := range components[2:] {
if len(component) == 0 { if len(component) == 0 {
log.Debugln("Empty component on line: ", line) log.Debugln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc() sampleErrors.WithLabelValues("malformed_component").Inc()
continue samples continue samples
} }
} }
@ -478,13 +479,13 @@ samples:
case '@': case '@':
if statType != "c" && statType != "ms" { if statType != "c" && statType != "ms" {
log.Debugln("Illegal sampling factor for non-counter metric on line", line) log.Debugln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc() sampleErrors.WithLabelValues("illegal_sample_factor").Inc()
continue continue
} }
samplingFactor, err = strconv.ParseFloat(component[1:], 64) samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil { if err != nil {
log.Debugf("Invalid sampling factor %s on line %s", component[1:], line) log.Debugf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc() sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
} }
if samplingFactor == 0 { if samplingFactor == 0 {
samplingFactor = 1 samplingFactor = 1
@ -499,7 +500,7 @@ samples:
labels = parseDogStatsDTagsToLabels(component) labels = parseDogStatsDTagsToLabels(component)
default: default:
log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line) log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc() sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
continue continue
} }
} }
@ -509,12 +510,11 @@ samples:
event, err := buildEvent(statType, metric, value, relative, labels) event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil { if err != nil {
log.Debugf("Error building event on line %s: %s", line, err) log.Debugf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc() sampleErrors.WithLabelValues("illegal_event").Inc()
continue continue
} }
events = append(events, event) events = append(events, event)
} }
networkStats.WithLabelValues("legal").Inc()
} }
return events return events
} }
@ -535,9 +535,11 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) {
} }
func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) {
udpPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
events := Events{} events := Events{}
for _, line := range lines { for _, line := range lines {
linesReceived.Inc()
events = append(events, lineToEvents(line)...) events = append(events, lineToEvents(line)...)
} }
e <- events e <- events
@ -560,21 +562,24 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) {
func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
defer c.Close() defer c.Close()
tcpConnections.Inc()
r := bufio.NewReader(c) r := bufio.NewReader(c)
for { for {
line, isPrefix, err := r.ReadLine() line, isPrefix, err := r.ReadLine()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
networkStats.WithLabelValues("tcp_error").Inc() tcpErrors.Inc()
log.Debugf("Read %s failed: %v", c.RemoteAddr(), err) log.Debugf("Read %s failed: %v", c.RemoteAddr(), err)
} }
break break
} }
if isPrefix { if isPrefix {
networkStats.WithLabelValues("tcp_line_too_long").Inc() tcpLineTooLong.Inc()
log.Debugf("Read %s failed: line too long", c.RemoteAddr()) log.Debugf("Read %s failed: line too long", c.RemoteAddr())
break break
} }
linesReceived.Inc()
e <- lineToEvents(string(line)) e <- lineToEvents(string(line))
} }
} }

View file

@ -29,12 +29,60 @@ var (
Name: "statsd_exporter_events_unmapped_total", Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.", Help: "The total number of StatsD events no mapping was found for.",
}) })
networkStats = prometheus.NewCounterVec( udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_exporter_packets_total", Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets seen.", Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsign DogStatsD tags.",
}, },
[]string{"type"},
) )
configLoads = prometheus.NewCounterVec( configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -58,7 +106,15 @@ var (
func init() { func init() {
prometheus.MustRegister(eventStats) prometheus.MustRegister(eventStats)
prometheus.MustRegister(networkStats) prometheus.MustRegister(udpPackets)
prometheus.MustRegister(tcpConnections)
prometheus.MustRegister(tcpErrors)
prometheus.MustRegister(tcpLineTooLong)
prometheus.MustRegister(linesReceived)
prometheus.MustRegister(samplesReceived)
prometheus.MustRegister(sampleErrors)
prometheus.MustRegister(tagsReceived)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(configLoads) prometheus.MustRegister(configLoads)
prometheus.MustRegister(mappingsCount) prometheus.MustRegister(mappingsCount)
prometheus.MustRegister(conflictingEventStats) prometheus.MustRegister(conflictingEventStats)