mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-11-22 23:41:00 +00:00
c7e76967c8
At high traffic levels, the locking around sending on channels can cause a large amount of blocking and CPU usage. These adds an event queue mechanism so that events are queued for short period of time, and flushed in batches to the main exporter goroutine periodically. The default is is to flush every 1000 events, or every 200ms, whichever happens first. Signed-off-by: Clayton O'Neill <claytono@github.com>
550 lines
14 KiB
Go
550 lines
14 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/log"
|
|
|
|
"github.com/prometheus/statsd_exporter/pkg/clock"
|
|
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
|
)
|
|
|
|
const (
|
|
defaultHelp = "Metric autogenerated by statsd_exporter."
|
|
regErrF = "Failed to update metric %q. Error: %s"
|
|
)
|
|
|
|
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
|
|
// This allows incoming metrics to have inconsistent label sets
|
|
type uncheckedCollector struct {
|
|
c prometheus.Collector
|
|
}
|
|
|
|
func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {}
|
|
func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
|
|
u.c.Collect(c)
|
|
}
|
|
|
|
type Exporter struct {
|
|
mapper *mapper.MetricMapper
|
|
registry *registry
|
|
}
|
|
|
|
// Replace invalid characters in the metric name with "_"
|
|
// Valid characters are a-z, A-Z, 0-9, and _
|
|
func escapeMetricName(metricName string) string {
|
|
metricLen := len(metricName)
|
|
if metricLen == 0 {
|
|
return ""
|
|
}
|
|
|
|
escaped := false
|
|
var sb strings.Builder
|
|
// If a metric starts with a digit, allocate the memory and prepend an
|
|
// underscore.
|
|
if metricName[0] >= '0' && metricName[0] <= '9' {
|
|
escaped = true
|
|
sb.Grow(metricLen + 1)
|
|
sb.WriteByte('_')
|
|
}
|
|
|
|
// This is an character replacement method optimized for this limited
|
|
// use case. It is much faster than using a regex.
|
|
offset := 0
|
|
for i, c := range metricName {
|
|
// Seek forward, skipping valid characters until we find one that needs
|
|
// to be replaced, then add all the characters we've seen so far to the
|
|
// string.Builder.
|
|
if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
|
|
(c >= '0' && c <= '9') || (c == '_') {
|
|
// Character is valid, so skip over it without doing anything.
|
|
} else {
|
|
if !escaped {
|
|
// Up until now we've been lazy and avoided actually allocating
|
|
// memory. Unfortunately we've now determined this string needs
|
|
// escaping, so allocate the buffer for the whole string.
|
|
escaped = true
|
|
sb.Grow(metricLen)
|
|
}
|
|
sb.WriteString(metricName[offset:i])
|
|
offset = i + utf8.RuneLen(c)
|
|
sb.WriteByte('_')
|
|
}
|
|
}
|
|
|
|
if !escaped {
|
|
// This is the happy path where nothing had to be escaped, so we can
|
|
// avoid doing anything.
|
|
return metricName
|
|
}
|
|
|
|
if offset < metricLen {
|
|
sb.WriteString(metricName[offset:])
|
|
}
|
|
|
|
return sb.String()
|
|
}
|
|
|
|
// Listen handles all events sent to the given channel sequentially. It
|
|
// terminates when the channel is closed.
|
|
func (b *Exporter) Listen(e <-chan Events) {
|
|
removeStaleMetricsTicker := clock.NewTicker(time.Second)
|
|
|
|
for {
|
|
select {
|
|
case <-removeStaleMetricsTicker.C:
|
|
b.registry.removeStaleMetrics()
|
|
case events, ok := <-e:
|
|
if !ok {
|
|
log.Debug("Channel is closed. Break out of Exporter.Listener.")
|
|
removeStaleMetricsTicker.Stop()
|
|
return
|
|
}
|
|
for _, event := range events {
|
|
b.handleEvent(event)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent processes a single Event according to the configured mapping.
|
|
func (b *Exporter) handleEvent(event Event) {
|
|
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
|
|
if mapping == nil {
|
|
mapping = &mapper.MetricMapping{}
|
|
if b.mapper.Defaults.Ttl != 0 {
|
|
mapping.Ttl = b.mapper.Defaults.Ttl
|
|
}
|
|
}
|
|
|
|
if mapping.Action == mapper.ActionTypeDrop {
|
|
eventsActions.WithLabelValues("drop").Inc()
|
|
return
|
|
}
|
|
|
|
help := defaultHelp
|
|
if mapping.HelpText != "" {
|
|
help = mapping.HelpText
|
|
}
|
|
|
|
metricName := ""
|
|
prometheusLabels := event.Labels()
|
|
if present {
|
|
if mapping.Name == "" {
|
|
log.Debugf("The mapping of '%s' for match '%s' generates an empty metric name", event.MetricName(), mapping.Match)
|
|
errorEventStats.WithLabelValues("empty_metric_name").Inc()
|
|
return
|
|
}
|
|
metricName = escapeMetricName(mapping.Name)
|
|
for label, value := range labels {
|
|
prometheusLabels[label] = value
|
|
}
|
|
eventsActions.WithLabelValues(string(mapping.Action)).Inc()
|
|
} else {
|
|
eventsUnmapped.Inc()
|
|
metricName = escapeMetricName(event.MetricName())
|
|
}
|
|
|
|
switch ev := event.(type) {
|
|
case *CounterEvent:
|
|
// We don't accept negative values for counters. Incrementing the counter with a negative number
|
|
// will cause the exporter to panic. Instead we will warn and continue to the next event.
|
|
if event.Value() < 0.0 {
|
|
log.Debugf("Counter %q is: '%f' (counter must be non-negative value)", metricName, event.Value())
|
|
errorEventStats.WithLabelValues("illegal_negative_counter").Inc()
|
|
return
|
|
}
|
|
|
|
counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping)
|
|
if err == nil {
|
|
counter.Add(event.Value())
|
|
eventStats.WithLabelValues("counter").Inc()
|
|
} else {
|
|
log.Debugf(regErrF, metricName, err)
|
|
conflictingEventStats.WithLabelValues("counter").Inc()
|
|
}
|
|
|
|
case *GaugeEvent:
|
|
gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping)
|
|
|
|
if err == nil {
|
|
if ev.relative {
|
|
gauge.Add(event.Value())
|
|
} else {
|
|
gauge.Set(event.Value())
|
|
}
|
|
eventStats.WithLabelValues("gauge").Inc()
|
|
} else {
|
|
log.Debugf(regErrF, metricName, err)
|
|
conflictingEventStats.WithLabelValues("gauge").Inc()
|
|
}
|
|
|
|
case *TimerEvent:
|
|
t := mapper.TimerTypeDefault
|
|
if mapping != nil {
|
|
t = mapping.TimerType
|
|
}
|
|
if t == mapper.TimerTypeDefault {
|
|
t = b.mapper.Defaults.TimerType
|
|
}
|
|
|
|
switch t {
|
|
case mapper.TimerTypeHistogram:
|
|
histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping)
|
|
if err == nil {
|
|
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
|
eventStats.WithLabelValues("timer").Inc()
|
|
} else {
|
|
log.Debugf(regErrF, metricName, err)
|
|
conflictingEventStats.WithLabelValues("timer").Inc()
|
|
}
|
|
|
|
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
|
|
summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping)
|
|
if err == nil {
|
|
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
|
|
eventStats.WithLabelValues("timer").Inc()
|
|
} else {
|
|
log.Debugf(regErrF, metricName, err)
|
|
conflictingEventStats.WithLabelValues("timer").Inc()
|
|
}
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown timer type '%s'", t))
|
|
}
|
|
|
|
default:
|
|
log.Debugln("Unsupported event type")
|
|
eventStats.WithLabelValues("illegal").Inc()
|
|
}
|
|
}
|
|
|
|
func NewExporter(mapper *mapper.MetricMapper) *Exporter {
|
|
return &Exporter{
|
|
mapper: mapper,
|
|
registry: newRegistry(mapper),
|
|
}
|
|
}
|
|
|
|
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) {
|
|
switch statType {
|
|
case "c":
|
|
return &CounterEvent{
|
|
metricName: metric,
|
|
value: float64(value),
|
|
labels: labels,
|
|
}, nil
|
|
case "g":
|
|
return &GaugeEvent{
|
|
metricName: metric,
|
|
value: float64(value),
|
|
relative: relative,
|
|
labels: labels,
|
|
}, nil
|
|
case "ms", "h", "d":
|
|
return &TimerEvent{
|
|
metricName: metric,
|
|
value: float64(value),
|
|
labels: labels,
|
|
}, nil
|
|
case "s":
|
|
return nil, fmt.Errorf("no support for StatsD sets")
|
|
default:
|
|
return nil, fmt.Errorf("bad stat type %s", statType)
|
|
}
|
|
}
|
|
|
|
func handleDogStatsDTagToKeyValue(labels map[string]string, component, tag string) {
|
|
// Bail early if the tag is empty
|
|
if len(tag) == 0 {
|
|
tagErrors.Inc()
|
|
log.Debugf("Malformed or empty DogStatsD tag %s in component %s", tag, component)
|
|
return
|
|
}
|
|
// Skip hash if found.
|
|
if tag[0] == '#' {
|
|
tag = tag[1:]
|
|
}
|
|
|
|
// find the first comma and split the tag into key and value.
|
|
var k, v string
|
|
for i, c := range tag {
|
|
if c == ':' {
|
|
k = tag[0:i]
|
|
v = tag[(i + 1):]
|
|
break
|
|
}
|
|
}
|
|
// If either of them is empty, then either the k or v is empty, or we
|
|
// didn't find a colon, either way, throw an error and skip ahead.
|
|
if len(k) == 0 || len(v) == 0 {
|
|
tagErrors.Inc()
|
|
log.Debugf("Malformed or empty DogStatsD tag %s in component %s", tag, component)
|
|
return
|
|
}
|
|
|
|
labels[escapeMetricName(k)] = v
|
|
|
|
return
|
|
}
|
|
|
|
func parseDogStatsDTagsToLabels(component string) map[string]string {
|
|
labels := map[string]string{}
|
|
tagsReceived.Inc()
|
|
|
|
lastTagEndIndex := 0
|
|
for i, c := range component {
|
|
if c == ',' {
|
|
tag := component[lastTagEndIndex:i]
|
|
lastTagEndIndex = i + 1
|
|
handleDogStatsDTagToKeyValue(labels, component, tag)
|
|
}
|
|
}
|
|
|
|
// If we're not off the end of the string, add the last tag
|
|
if lastTagEndIndex < len(component) {
|
|
tag := component[lastTagEndIndex:]
|
|
handleDogStatsDTagToKeyValue(labels, component, tag)
|
|
}
|
|
|
|
return labels
|
|
}
|
|
|
|
func lineToEvents(line string) Events {
|
|
events := Events{}
|
|
if line == "" {
|
|
return events
|
|
}
|
|
|
|
elements := strings.SplitN(line, ":", 2)
|
|
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
|
|
sampleErrors.WithLabelValues("malformed_line").Inc()
|
|
log.Debugln("Bad line from StatsD:", line)
|
|
return events
|
|
}
|
|
metric := elements[0]
|
|
var samples []string
|
|
if strings.Contains(elements[1], "|#") {
|
|
// using datadog extensions, disable multi-metrics
|
|
samples = elements[1:]
|
|
} else {
|
|
samples = strings.Split(elements[1], ":")
|
|
}
|
|
samples:
|
|
for _, sample := range samples {
|
|
samplesReceived.Inc()
|
|
components := strings.Split(sample, "|")
|
|
samplingFactor := 1.0
|
|
if len(components) < 2 || len(components) > 4 {
|
|
sampleErrors.WithLabelValues("malformed_component").Inc()
|
|
log.Debugln("Bad component on line:", line)
|
|
continue
|
|
}
|
|
valueStr, statType := components[0], components[1]
|
|
|
|
var relative = false
|
|
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
|
|
relative = true
|
|
}
|
|
|
|
value, err := strconv.ParseFloat(valueStr, 64)
|
|
if err != nil {
|
|
log.Debugf("Bad value %s on line: %s", valueStr, line)
|
|
sampleErrors.WithLabelValues("malformed_value").Inc()
|
|
continue
|
|
}
|
|
|
|
multiplyEvents := 1
|
|
labels := map[string]string{}
|
|
if len(components) >= 3 {
|
|
for _, component := range components[2:] {
|
|
if len(component) == 0 {
|
|
log.Debugln("Empty component on line: ", line)
|
|
sampleErrors.WithLabelValues("malformed_component").Inc()
|
|
continue samples
|
|
}
|
|
}
|
|
|
|
for _, component := range components[2:] {
|
|
switch component[0] {
|
|
case '@':
|
|
if statType != "c" && statType != "ms" {
|
|
log.Debugln("Illegal sampling factor for non-counter metric on line", line)
|
|
sampleErrors.WithLabelValues("illegal_sample_factor").Inc()
|
|
continue
|
|
}
|
|
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
|
|
if err != nil {
|
|
log.Debugf("Invalid sampling factor %s on line %s", component[1:], line)
|
|
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
|
|
}
|
|
if samplingFactor == 0 {
|
|
samplingFactor = 1
|
|
}
|
|
|
|
if statType == "c" {
|
|
value /= samplingFactor
|
|
} else if statType == "ms" {
|
|
multiplyEvents = int(1 / samplingFactor)
|
|
}
|
|
case '#':
|
|
labels = parseDogStatsDTagsToLabels(component[1:])
|
|
default:
|
|
log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line)
|
|
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
for i := 0; i < multiplyEvents; i++ {
|
|
event, err := buildEvent(statType, metric, value, relative, labels)
|
|
if err != nil {
|
|
log.Debugf("Error building event on line %s: %s", line, err)
|
|
sampleErrors.WithLabelValues("illegal_event").Inc()
|
|
continue
|
|
}
|
|
events = append(events, event)
|
|
}
|
|
}
|
|
return events
|
|
}
|
|
|
|
type StatsDUDPListener struct {
|
|
conn *net.UDPConn
|
|
eventHandler eventHandler
|
|
}
|
|
|
|
func (l *StatsDUDPListener) SetEventHandler(eh eventHandler) {
|
|
l.eventHandler = eh
|
|
}
|
|
|
|
func (l *StatsDUDPListener) Listen() {
|
|
buf := make([]byte, 65535)
|
|
for {
|
|
n, _, err := l.conn.ReadFromUDP(buf)
|
|
if err != nil {
|
|
// https://github.com/golang/go/issues/4373
|
|
// ignore net: errClosing error as it will occur during shutdown
|
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
|
return
|
|
}
|
|
log.Error(err)
|
|
return
|
|
}
|
|
l.handlePacket(buf[0:n])
|
|
}
|
|
}
|
|
|
|
func (l *StatsDUDPListener) handlePacket(packet []byte) {
|
|
udpPackets.Inc()
|
|
lines := strings.Split(string(packet), "\n")
|
|
for _, line := range lines {
|
|
linesReceived.Inc()
|
|
l.eventHandler.queue(lineToEvents(line))
|
|
}
|
|
}
|
|
|
|
type StatsDTCPListener struct {
|
|
conn *net.TCPListener
|
|
eventHandler eventHandler
|
|
}
|
|
|
|
func (l *StatsDTCPListener) SetEventHandler(eh eventHandler) {
|
|
l.eventHandler = eh
|
|
}
|
|
|
|
func (l *StatsDTCPListener) Listen() {
|
|
for {
|
|
c, err := l.conn.AcceptTCP()
|
|
if err != nil {
|
|
// https://github.com/golang/go/issues/4373
|
|
// ignore net: errClosing error as it will occur during shutdown
|
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
|
return
|
|
}
|
|
log.Fatalf("AcceptTCP failed: %v", err)
|
|
}
|
|
go l.handleConn(c)
|
|
}
|
|
}
|
|
|
|
func (l *StatsDTCPListener) handleConn(c *net.TCPConn) {
|
|
defer c.Close()
|
|
|
|
tcpConnections.Inc()
|
|
|
|
r := bufio.NewReader(c)
|
|
for {
|
|
line, isPrefix, err := r.ReadLine()
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
tcpErrors.Inc()
|
|
log.Debugf("Read %s failed: %v", c.RemoteAddr(), err)
|
|
}
|
|
break
|
|
}
|
|
if isPrefix {
|
|
tcpLineTooLong.Inc()
|
|
log.Debugf("Read %s failed: line too long", c.RemoteAddr())
|
|
break
|
|
}
|
|
linesReceived.Inc()
|
|
l.eventHandler.queue(lineToEvents(string(line)))
|
|
}
|
|
}
|
|
|
|
type StatsDUnixgramListener struct {
|
|
conn *net.UnixConn
|
|
eventHandler eventHandler
|
|
}
|
|
|
|
func (l *StatsDUnixgramListener) SetEventHandler(eh eventHandler) {
|
|
l.eventHandler = eh
|
|
}
|
|
|
|
func (l *StatsDUnixgramListener) Listen() {
|
|
buf := make([]byte, 65535)
|
|
for {
|
|
n, _, err := l.conn.ReadFromUnix(buf)
|
|
if err != nil {
|
|
// https://github.com/golang/go/issues/4373
|
|
// ignore net: errClosing error as it will occur during shutdown
|
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
|
return
|
|
}
|
|
log.Fatal(err)
|
|
}
|
|
l.handlePacket(buf[:n])
|
|
}
|
|
}
|
|
|
|
func (l *StatsDUnixgramListener) handlePacket(packet []byte) {
|
|
unixgramPackets.Inc()
|
|
lines := strings.Split(string(packet), "\n")
|
|
for _, line := range lines {
|
|
linesReceived.Inc()
|
|
l.eventHandler.queue(lineToEvents(string(line)))
|
|
}
|
|
}
|