statsd_exporter/pkg/exporter/exporter.go~

173 lines
5.8 KiB
Go
Raw Normal View History

package exporter
import (
"os"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/registry"
)
const (
defaultHelp = "Metric autogenerated by statsd_exporter."
regErrF = "Failed to update metric"
)
type Exporter struct {
Mapper *mapper.MetricMapper
Registry *registry.Registry
Logger log.Logger
}
// Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan event.Events, thisEvent event.Event, eventsActions prometheus.GaugeVec, eventsUnmapped prometheus.Gauge,
errorEventStats prometheus.GaugeVec, eventStats prometheus.GaugeVec, conflictingEventStats prometheus.GaugeVec, metricsCount prometheus.GaugeVec, l func(string, log.Logger)) {
removeStaleMetricsTicker := clock.NewTicker(time.Second)
for {
select {
case <-removeStaleMetricsTicker.C:
b.Registry.RemoveStaleMetrics()
case events, ok := <-e:
if !ok {
level.Debug(b.Logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.")
removeStaleMetricsTicker.Stop()
return
}
for _, event := range events {
b.handleEvent(event, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount, l)
}
}
}
}
// handleEvent processes a single Event according to the configured mapping.
func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions prometheus.GaugeVec, eventsUnmapped prometheus.Gauge,
errorEventStats prometheus.GaugeVec, eventStats prometheus.GaugeVec, conflictingEventStats prometheus.GaugeVec, metricsCount prometheus.GaugeVec, l func(string, log.Logger)) {
mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType())
if mapping == nil {
mapping = &mapper.MetricMapping{}
if b.Mapper.Defaults.Ttl != 0 {
mapping.Ttl = b.Mapper.Defaults.Ttl
}
}
if mapping.Action == mapper.ActionTypeDrop {
eventsActions.WithLabelValues("drop").Inc()
return
}
metricName := ""
help := defaultHelp
if mapping.HelpText != "" {
help = mapping.HelpText
}
prometheusLabels := thisEvent.Labels()
if present {
if mapping.Name == "" {
level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match)
errorEventStats.WithLabelValues("empty_metric_name").Inc()
return
}
metricName = mapper.EscapeMetricName(mapping.Name)
for label, value := range labels {
prometheusLabels[label] = value
}
eventsActions.WithLabelValues(string(mapping.Action)).Inc()
} else {
eventsUnmapped.Inc()
metricName = mapper.EscapeMetricName(thisEvent.MetricName())
}
switch ev := thisEvent.(type) {
case *event.CounterEvent:
// We don't accept negative values for counters. Incrementing the counter with a negative number
// will cause the exporter to panic. Instead we will warn and continue to the next event.
if thisEvent.Value() < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value())
errorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return
}
counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, &metricsCount)
if err == nil {
counter.Add(thisEvent.Value())
eventStats.WithLabelValues("counter").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("counter").Inc()
}
case *event.GaugeEvent:
gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, &metricsCount)
if err == nil {
if ev.GRelative {
gauge.Add(thisEvent.Value())
} else {
gauge.Set(thisEvent.Value())
}
eventStats.WithLabelValues("gauge").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("gauge").Inc()
}
case *event.TimerEvent:
t := mapper.TimerTypeDefault
if mapping != nil {
t = mapping.TimerType
}
if t == mapper.TimerTypeDefault {
t = b.Mapper.Defaults.TimerType
}
switch t {
case mapper.TimerTypeHistogram:
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, &metricsCount)
if err == nil {
histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
}
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, &metricsCount)
if err == nil {
summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
}
default:
level.Error(b.Logger).Log("msg", "unknown timer type", "type", t)
os.Exit(1)
}
default:
level.Debug(b.Logger).Log("msg", "Unsupported event type")
eventStats.WithLabelValues("illegal").Inc()
}
}
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter {
return &Exporter{
Mapper: mapper,
Registry: registry.NewRegistry(mapper),
Logger: logger,
}
}